mirror of https://github.com/citusdata/citus.git
Include all relevant relations in the ExtractRangeTableRelationWalker (#3135)
We've changed the logic for pulling RTE_RELATIONs in #3109 and non-colocated subquery joins and partitioned tables. @onurctirtir found this steps where I traced back and found the issues. While looking into it in more detail, we decided to expand the list in a way that the callers get all the relevant RTE_RELATIONs RELKIND_RELATION, RELKIND_PARTITIONED_TABLE, RELKIND_FOREIGN_TABLE and RELKIND_MATVIEW. These are all relation kinds that Citus planner is aware of.pull/3151/head^2
parent
d3f68bf44f
commit
ffd89e4e01
|
@ -995,8 +995,8 @@ HasUnsupportedJoinWalker(Node *node, void *context)
|
||||||
static bool
|
static bool
|
||||||
ErrorHintRequired(const char *errorHint, Query *queryTree)
|
ErrorHintRequired(const char *errorHint, Query *queryTree)
|
||||||
{
|
{
|
||||||
List *rangeTableList = NIL;
|
List *distributedRelationIdList = DistributedRelationIdList(queryTree);
|
||||||
ListCell *rangeTableCell = NULL;
|
ListCell *relationIdCell = NULL;
|
||||||
List *colocationIdList = NIL;
|
List *colocationIdList = NIL;
|
||||||
|
|
||||||
if (errorHint == NULL)
|
if (errorHint == NULL)
|
||||||
|
@ -1004,11 +1004,9 @@ ErrorHintRequired(const char *errorHint, Query *queryTree)
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
ExtractRangeTableRelationWalker((Node *) queryTree, &rangeTableList);
|
foreach(relationIdCell, distributedRelationIdList)
|
||||||
foreach(rangeTableCell, rangeTableList)
|
|
||||||
{
|
{
|
||||||
RangeTblEntry *rte = (RangeTblEntry *) lfirst(rangeTableCell);
|
Oid relationId = lfirst_oid(relationIdCell);
|
||||||
Oid relationId = rte->relid;
|
|
||||||
char partitionMethod = PartitionMethod(relationId);
|
char partitionMethod = PartitionMethod(relationId);
|
||||||
if (partitionMethod == DISTRIBUTE_BY_NONE)
|
if (partitionMethod == DISTRIBUTE_BY_NONE)
|
||||||
{
|
{
|
||||||
|
|
|
@ -2216,7 +2216,7 @@ static void
|
||||||
ErrorIfUnsupportedShardDistribution(Query *query)
|
ErrorIfUnsupportedShardDistribution(Query *query)
|
||||||
{
|
{
|
||||||
Oid firstTableRelationId = InvalidOid;
|
Oid firstTableRelationId = InvalidOid;
|
||||||
List *relationIdList = RelationIdList(query);
|
List *relationIdList = DistributedRelationIdList(query);
|
||||||
List *nonReferenceRelations = NIL;
|
List *nonReferenceRelations = NIL;
|
||||||
ListCell *relationIdCell = NULL;
|
ListCell *relationIdCell = NULL;
|
||||||
uint32 relationIndex = 0;
|
uint32 relationIndex = 0;
|
||||||
|
|
|
@ -1686,7 +1686,7 @@ RelationRestrictionPartitionKeyIndex(RelationRestriction *relationRestriction)
|
||||||
* RelationIdList returns list of unique relation ids in query tree.
|
* RelationIdList returns list of unique relation ids in query tree.
|
||||||
*/
|
*/
|
||||||
List *
|
List *
|
||||||
RelationIdList(Query *query)
|
DistributedRelationIdList(Query *query)
|
||||||
{
|
{
|
||||||
List *rangeTableList = NIL;
|
List *rangeTableList = NIL;
|
||||||
List *tableEntryList = NIL;
|
List *tableEntryList = NIL;
|
||||||
|
@ -1701,6 +1701,11 @@ RelationIdList(Query *query)
|
||||||
TableEntry *tableEntry = (TableEntry *) lfirst(tableEntryCell);
|
TableEntry *tableEntry = (TableEntry *) lfirst(tableEntryCell);
|
||||||
Oid relationId = tableEntry->relationId;
|
Oid relationId = tableEntry->relationId;
|
||||||
|
|
||||||
|
if (!IsDistributedTable(relationId))
|
||||||
|
{
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
relationIdList = list_append_unique_oid(relationIdList, relationId);
|
relationIdList = list_append_unique_oid(relationIdList, relationId);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -17,6 +17,10 @@
|
||||||
#include "distributed/version_compat.h"
|
#include "distributed/version_compat.h"
|
||||||
#include "nodes/nodeFuncs.h"
|
#include "nodes/nodeFuncs.h"
|
||||||
|
|
||||||
|
|
||||||
|
static bool CitusQueryableRangeTableRelation(RangeTblEntry *rangeTableEntry);
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* ExtractRangeTableList walks over a tree to gather entries.
|
* ExtractRangeTableList walks over a tree to gather entries.
|
||||||
* Execution is parameterized by passing walkerMode flag via ExtractRangeTableWalkerContext
|
* Execution is parameterized by passing walkerMode flag via ExtractRangeTableWalkerContext
|
||||||
|
@ -40,12 +44,9 @@ ExtractRangeTableList(Node *node, ExtractRangeTableWalkerContext *context)
|
||||||
{
|
{
|
||||||
RangeTblEntry *rangeTable = (RangeTblEntry *) node;
|
RangeTblEntry *rangeTable = (RangeTblEntry *) node;
|
||||||
|
|
||||||
/* make sure that we are extracting only relation entries if walkerMode is set to EXTRACT_RELATION_ENTRIES*/
|
if (walkerMode == EXTRACT_ALL_ENTRIES ||
|
||||||
if (walkerMode == EXTRACT_ALL_ENTRIES || (walkerMode ==
|
(walkerMode == EXTRACT_RELATION_ENTRIES &&
|
||||||
EXTRACT_RELATION_ENTRIES &&
|
CitusQueryableRangeTableRelation(rangeTable)))
|
||||||
rangeTable->rtekind == RTE_RELATION &&
|
|
||||||
rangeTable->relkind ==
|
|
||||||
RELKIND_RELATION))
|
|
||||||
{
|
{
|
||||||
(*rangeTableRelationList) = lappend(*rangeTableRelationList, rangeTable);
|
(*rangeTableRelationList) = lappend(*rangeTableRelationList, rangeTable);
|
||||||
}
|
}
|
||||||
|
@ -81,9 +82,43 @@ ExtractRangeTableList(Node *node, ExtractRangeTableWalkerContext *context)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* CitusQueryableRangeTableRelation returns true if the input range table
|
||||||
|
* entry is a relation and it can be used in a distributed query, including
|
||||||
|
* local tables and materialized views as well.
|
||||||
|
*/
|
||||||
|
static bool
|
||||||
|
CitusQueryableRangeTableRelation(RangeTblEntry *rangeTableEntry)
|
||||||
|
{
|
||||||
|
char relationKind = '\0';
|
||||||
|
|
||||||
|
if (rangeTableEntry->rtekind != RTE_RELATION)
|
||||||
|
{
|
||||||
|
/* we're only interested in relations */
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
relationKind = rangeTableEntry->relkind;
|
||||||
|
if (relationKind == RELKIND_RELATION || relationKind == RELKIND_PARTITIONED_TABLE ||
|
||||||
|
relationKind == RELKIND_FOREIGN_TABLE || relationKind == RELKIND_MATVIEW)
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
* RELKIND_VIEW are automatically replaced with a subquery in
|
||||||
|
* the query tree, so we ignore them here.
|
||||||
|
*
|
||||||
|
* RELKIND_MATVIEW is equivalent of a local table in postgres.
|
||||||
|
*/
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* ExtractRangeTableRelationWalker gathers all range table relation entries
|
* ExtractRangeTableRelationWalker gathers all range table relation entries
|
||||||
* in a query.
|
* in a query. The caller is responsible for checking whether the returned
|
||||||
|
* entries are distributed or not.
|
||||||
*/
|
*/
|
||||||
bool
|
bool
|
||||||
ExtractRangeTableRelationWalker(Node *node, List **rangeTableRelationList)
|
ExtractRangeTableRelationWalker(Node *node, List **rangeTableRelationList)
|
||||||
|
|
|
@ -17,7 +17,7 @@
|
||||||
/* Enum to define execution flow of ExtractRangeTableList */
|
/* Enum to define execution flow of ExtractRangeTableList */
|
||||||
typedef enum ExtractRangeTableMode
|
typedef enum ExtractRangeTableMode
|
||||||
{
|
{
|
||||||
EXTRACT_RELATION_ENTRIES,
|
EXTRACT_RELATION_ENTRIES, /* inclduding local, foreign and partitioned tables */
|
||||||
EXTRACT_ALL_ENTRIES
|
EXTRACT_ALL_ENTRIES
|
||||||
} ExtractRangeTableMode;
|
} ExtractRangeTableMode;
|
||||||
|
|
||||||
|
|
|
@ -31,7 +31,7 @@ extern List * GenerateAllAttributeEquivalences(PlannerRestrictionContext *
|
||||||
plannerRestrictionContext);
|
plannerRestrictionContext);
|
||||||
extern uint32 ReferenceRelationCount(RelationRestrictionContext *restrictionContext);
|
extern uint32 ReferenceRelationCount(RelationRestrictionContext *restrictionContext);
|
||||||
|
|
||||||
extern List * RelationIdList(Query *query);
|
extern List * DistributedRelationIdList(Query *query);
|
||||||
extern PlannerRestrictionContext * FilterPlannerRestrictionForQuery(
|
extern PlannerRestrictionContext * FilterPlannerRestrictionForQuery(
|
||||||
PlannerRestrictionContext *plannerRestrictionContext,
|
PlannerRestrictionContext *plannerRestrictionContext,
|
||||||
Query *query);
|
Query *query);
|
||||||
|
|
|
@ -319,6 +319,19 @@ INSERT INTO small VALUES(14, 14);
|
||||||
-- delete statement with CTE
|
-- delete statement with CTE
|
||||||
WITH all_small_view_ids AS (SELECT id FROM small_view)
|
WITH all_small_view_ids AS (SELECT id FROM small_view)
|
||||||
DELETE FROM large_partitioned WHERE id in (SELECT * FROM all_small_view_ids);
|
DELETE FROM large_partitioned WHERE id in (SELECT * FROM all_small_view_ids);
|
||||||
|
-- make sure that materialized view in a CTE/subquery can be joined with a distributed table
|
||||||
|
WITH cte AS (SELECT *, random() FROM small_view) SELECT count(*) FROM cte JOIN small USING(id);
|
||||||
|
count
|
||||||
|
-------
|
||||||
|
4
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT count(*) FROM (SELECT *, random() FROM small_view) as subquery JOIN small USING(id);
|
||||||
|
count
|
||||||
|
-------
|
||||||
|
4
|
||||||
|
(1 row)
|
||||||
|
|
||||||
DROP TABLE large_partitioned;
|
DROP TABLE large_partitioned;
|
||||||
DROP TABLE small CASCADE;
|
DROP TABLE small CASCADE;
|
||||||
NOTICE: drop cascades to materialized view small_view
|
NOTICE: drop cascades to materialized view small_view
|
||||||
|
|
|
@ -1054,8 +1054,44 @@ DEBUG: Plan is router executable
|
||||||
DEBUG: generating subplan 109_6 for subquery SELECT intermediate_result.user_id, intermediate_result."time", intermediate_result.event_type, intermediate_result.value_2, intermediate_result.value_3, intermediate_result.value_4 FROM read_intermediate_result('109_4'::text, 'binary'::citus_copy_format) intermediate_result(user_id integer, "time" timestamp without time zone, event_type integer, value_2 integer, value_3 double precision, value_4 bigint) INTERSECT SELECT intermediate_result.user_id, intermediate_result."time", intermediate_result.event_type, intermediate_result.value_2, intermediate_result.value_3, intermediate_result.value_4 FROM read_intermediate_result('109_5'::text, 'binary'::citus_copy_format) intermediate_result(user_id integer, "time" timestamp without time zone, event_type integer, value_2 integer, value_3 double precision, value_4 bigint)
|
DEBUG: generating subplan 109_6 for subquery SELECT intermediate_result.user_id, intermediate_result."time", intermediate_result.event_type, intermediate_result.value_2, intermediate_result.value_3, intermediate_result.value_4 FROM read_intermediate_result('109_4'::text, 'binary'::citus_copy_format) intermediate_result(user_id integer, "time" timestamp without time zone, event_type integer, value_2 integer, value_3 double precision, value_4 bigint) INTERSECT SELECT intermediate_result.user_id, intermediate_result."time", intermediate_result.event_type, intermediate_result.value_2, intermediate_result.value_3, intermediate_result.value_4 FROM read_intermediate_result('109_5'::text, 'binary'::citus_copy_format) intermediate_result(user_id integer, "time" timestamp without time zone, event_type integer, value_2 integer, value_3 double precision, value_4 bigint)
|
||||||
DEBUG: Router planner cannot handle multi-shard select queries
|
DEBUG: Router planner cannot handle multi-shard select queries
|
||||||
ERROR: cannot pushdown the subquery
|
ERROR: cannot pushdown the subquery
|
||||||
|
-- make sure that non-colocated subquery joins work fine in
|
||||||
|
-- modifications
|
||||||
|
CREATE TABLE table1 (id int, tenant_id int);
|
||||||
|
CREATE VIEW table1_view AS SELECT * from table1 where id < 100;
|
||||||
|
CREATE TABLE table2 (id int, tenant_id int) partition by range(tenant_id);
|
||||||
|
CREATE TABLE table2_p1 PARTITION OF table2 FOR VALUES FROM (1) TO (10);
|
||||||
|
-- modifications on the partitons are only allowed with rep=1
|
||||||
|
SET citus.shard_replication_factor TO 1;
|
||||||
|
SELECT create_distributed_table('table2','tenant_id');
|
||||||
|
create_distributed_table
|
||||||
|
--------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT create_distributed_table('table1','tenant_id');
|
||||||
|
create_distributed_table
|
||||||
|
--------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- all of the above queries are non-colocated subquery joins
|
||||||
|
-- because the views are replaced with subqueries
|
||||||
|
UPDATE table2 SET id=20 FROM table1_view WHERE table1_view.id=table2.id;
|
||||||
|
DEBUG: complex joins are only supported when all distributed tables are joined on their distribution columns with equal operator
|
||||||
|
DEBUG: Router planner cannot handle multi-shard select queries
|
||||||
|
DEBUG: generating subplan 117_1 for subquery SELECT table1.id, table1.tenant_id FROM non_colocated_subquery.table1 WHERE (table1.id OPERATOR(pg_catalog.<) 100)
|
||||||
|
DEBUG: Plan 117 query after replacing subqueries and CTEs: UPDATE non_colocated_subquery.table2 SET id = 20 FROM (SELECT intermediate_result.id, intermediate_result.tenant_id FROM read_intermediate_result('117_1'::text, 'binary'::citus_copy_format) intermediate_result(id integer, tenant_id integer)) table1_view WHERE (table1_view.id OPERATOR(pg_catalog.=) table2.id)
|
||||||
|
DEBUG: Creating router plan
|
||||||
|
DEBUG: Plan is router executable
|
||||||
|
UPDATE table2_p1 SET id=20 FROM table1_view WHERE table1_view.id=table2_p1.id;
|
||||||
|
DEBUG: complex joins are only supported when all distributed tables are joined on their distribution columns with equal operator
|
||||||
|
DEBUG: Router planner cannot handle multi-shard select queries
|
||||||
|
DEBUG: generating subplan 119_1 for subquery SELECT table1.id, table1.tenant_id FROM non_colocated_subquery.table1 WHERE (table1.id OPERATOR(pg_catalog.<) 100)
|
||||||
|
DEBUG: Plan 119 query after replacing subqueries and CTEs: UPDATE non_colocated_subquery.table2_p1 SET id = 20 FROM (SELECT intermediate_result.id, intermediate_result.tenant_id FROM read_intermediate_result('119_1'::text, 'binary'::citus_copy_format) intermediate_result(id integer, tenant_id integer)) table1_view WHERE (table1_view.id OPERATOR(pg_catalog.=) table2_p1.id)
|
||||||
|
DEBUG: Creating router plan
|
||||||
|
DEBUG: Plan is router executable
|
||||||
RESET client_min_messages;
|
RESET client_min_messages;
|
||||||
DROP FUNCTION explain_json_2(text);
|
DROP FUNCTION explain_json_2(text);
|
||||||
SET search_path TO 'public';
|
SET search_path TO 'public';
|
||||||
DROP SCHEMA non_colocated_subquery CASCADE;
|
DROP SCHEMA non_colocated_subquery CASCADE;
|
||||||
NOTICE: drop cascades to 2 other objects
|
NOTICE: drop cascades to 5 other objects
|
||||||
|
|
|
@ -246,5 +246,9 @@ INSERT INTO small VALUES(14, 14);
|
||||||
WITH all_small_view_ids AS (SELECT id FROM small_view)
|
WITH all_small_view_ids AS (SELECT id FROM small_view)
|
||||||
DELETE FROM large_partitioned WHERE id in (SELECT * FROM all_small_view_ids);
|
DELETE FROM large_partitioned WHERE id in (SELECT * FROM all_small_view_ids);
|
||||||
|
|
||||||
|
-- make sure that materialized view in a CTE/subquery can be joined with a distributed table
|
||||||
|
WITH cte AS (SELECT *, random() FROM small_view) SELECT count(*) FROM cte JOIN small USING(id);
|
||||||
|
SELECT count(*) FROM (SELECT *, random() FROM small_view) as subquery JOIN small USING(id);
|
||||||
|
|
||||||
DROP TABLE large_partitioned;
|
DROP TABLE large_partitioned;
|
||||||
DROP TABLE small CASCADE;
|
DROP TABLE small CASCADE;
|
||||||
|
|
|
@ -782,6 +782,24 @@ SELECT count(*) FROM events_table WHERE user_id NOT IN
|
||||||
);
|
);
|
||||||
|
|
||||||
|
|
||||||
|
-- make sure that non-colocated subquery joins work fine in
|
||||||
|
-- modifications
|
||||||
|
CREATE TABLE table1 (id int, tenant_id int);
|
||||||
|
CREATE VIEW table1_view AS SELECT * from table1 where id < 100;
|
||||||
|
CREATE TABLE table2 (id int, tenant_id int) partition by range(tenant_id);
|
||||||
|
CREATE TABLE table2_p1 PARTITION OF table2 FOR VALUES FROM (1) TO (10);
|
||||||
|
|
||||||
|
-- modifications on the partitons are only allowed with rep=1
|
||||||
|
SET citus.shard_replication_factor TO 1;
|
||||||
|
|
||||||
|
SELECT create_distributed_table('table2','tenant_id');
|
||||||
|
SELECT create_distributed_table('table1','tenant_id');
|
||||||
|
|
||||||
|
-- all of the above queries are non-colocated subquery joins
|
||||||
|
-- because the views are replaced with subqueries
|
||||||
|
UPDATE table2 SET id=20 FROM table1_view WHERE table1_view.id=table2.id;
|
||||||
|
UPDATE table2_p1 SET id=20 FROM table1_view WHERE table1_view.id=table2_p1.id;
|
||||||
|
|
||||||
RESET client_min_messages;
|
RESET client_min_messages;
|
||||||
DROP FUNCTION explain_json_2(text);
|
DROP FUNCTION explain_json_2(text);
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue