mirror of https://github.com/citusdata/citus.git
Prevent failures on partitioned distributed tables with statistics objects on PG 15
Comment from the code is clear on this: /* * The statistics objects of the distributed table are not relevant * for the distributed planning, so we can override it. * * Normally, we should not need this. However, the combination of * Postgres commit 269b532aef55a579ae02a3e8e8df14101570dfd9 and * Citus function AdjustPartitioningForDistributedPlanning() * forces us to do this. The commit expects statistics objects * of partitions to have "inh" flag set properly. Whereas, the * function overrides "inh" flag. To avoid Postgres to throw error, * we override statlist such that Postgres does not try to process * any statistics objects during the standard_planner() on the * coordinator. In the end, we do not need the standard_planner() * on the coordinator to generate an optimized plan. We call * into standard_planner() for other purposes, such as generating the * relationRestrictionContext here. * * AdjustPartitioningForDistributedPlanning() is a hack that we use * to prevent Postgres' standard_planner() to expand all the partitions * for the distributed planning when a distributed partitioned table * is queried. It is required for both correctness and performance * reasons. Although we can eliminate the use of the function for * the correctness (e.g., make sure that rest of the planner can handle * partitions), it's performance implication is hard to avoid. Certain * planning logic of Citus (such as router or query pushdown) relies * heavily on the relationRestrictionList. If * AdjustPartitioningForDistributedPlanning() is removed, all the * partitions show up in the, causing high planning times for * such queries. */pull/6349/head^2
parent
739b91afa6
commit
766f340ce0
|
@ -1897,14 +1897,14 @@ multi_relation_restriction_hook(PlannerInfo *root, RelOptInfo *relOptInfo,
|
||||||
MemoryContext restrictionsMemoryContext = plannerRestrictionContext->memoryContext;
|
MemoryContext restrictionsMemoryContext = plannerRestrictionContext->memoryContext;
|
||||||
MemoryContext oldMemoryContext = MemoryContextSwitchTo(restrictionsMemoryContext);
|
MemoryContext oldMemoryContext = MemoryContextSwitchTo(restrictionsMemoryContext);
|
||||||
|
|
||||||
bool distributedTable = IsCitusTable(rte->relid);
|
bool isCitusTable = IsCitusTable(rte->relid);
|
||||||
|
|
||||||
RelationRestriction *relationRestriction = palloc0(sizeof(RelationRestriction));
|
RelationRestriction *relationRestriction = palloc0(sizeof(RelationRestriction));
|
||||||
relationRestriction->index = restrictionIndex;
|
relationRestriction->index = restrictionIndex;
|
||||||
relationRestriction->relationId = rte->relid;
|
relationRestriction->relationId = rte->relid;
|
||||||
relationRestriction->rte = rte;
|
relationRestriction->rte = rte;
|
||||||
relationRestriction->relOptInfo = relOptInfo;
|
relationRestriction->relOptInfo = relOptInfo;
|
||||||
relationRestriction->distributedRelation = distributedTable;
|
relationRestriction->citusTable = isCitusTable;
|
||||||
relationRestriction->plannerInfo = root;
|
relationRestriction->plannerInfo = root;
|
||||||
|
|
||||||
/* see comments on GetVarFromAssignedParam() */
|
/* see comments on GetVarFromAssignedParam() */
|
||||||
|
@ -1919,10 +1919,42 @@ multi_relation_restriction_hook(PlannerInfo *root, RelOptInfo *relOptInfo,
|
||||||
* We're also keeping track of whether all participant
|
* We're also keeping track of whether all participant
|
||||||
* tables are reference tables.
|
* tables are reference tables.
|
||||||
*/
|
*/
|
||||||
if (distributedTable)
|
if (isCitusTable)
|
||||||
{
|
{
|
||||||
cacheEntry = GetCitusTableCacheEntry(rte->relid);
|
cacheEntry = GetCitusTableCacheEntry(rte->relid);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* The statistics objects of the distributed table are not relevant
|
||||||
|
* for the distributed planning, so we can override it.
|
||||||
|
*
|
||||||
|
* Normally, we should not need this. However, the combination of
|
||||||
|
* Postgres commit 269b532aef55a579ae02a3e8e8df14101570dfd9 and
|
||||||
|
* Citus function AdjustPartitioningForDistributedPlanning()
|
||||||
|
* forces us to do this. The commit expects statistics objects
|
||||||
|
* of partitions to have "inh" flag set properly. Whereas, the
|
||||||
|
* function overrides "inh" flag. To avoid Postgres to throw error,
|
||||||
|
* we override statlist such that Postgres does not try to process
|
||||||
|
* any statistics objects during the standard_planner() on the
|
||||||
|
* coordinator. In the end, we do not need the standard_planner()
|
||||||
|
* on the coordinator to generate an optimized plan. We call
|
||||||
|
* into standard_planner() for other purposes, such as generating the
|
||||||
|
* relationRestrictionContext here.
|
||||||
|
*
|
||||||
|
* AdjustPartitioningForDistributedPlanning() is a hack that we use
|
||||||
|
* to prevent Postgres' standard_planner() to expand all the partitions
|
||||||
|
* for the distributed planning when a distributed partitioned table
|
||||||
|
* is queried. It is required for both correctness and performance
|
||||||
|
* reasons. Although we can eliminate the use of the function for
|
||||||
|
* the correctness (e.g., make sure that rest of the planner can handle
|
||||||
|
* partitions), it's performance implication is hard to avoid. Certain
|
||||||
|
* planning logic of Citus (such as router or query pushdown) relies
|
||||||
|
* heavily on the relationRestrictionList. If
|
||||||
|
* AdjustPartitioningForDistributedPlanning() is removed, all the
|
||||||
|
* partitions show up in the, causing high planning times for
|
||||||
|
* such queries.
|
||||||
|
*/
|
||||||
|
relOptInfo->statlist = NIL;
|
||||||
|
|
||||||
relationRestrictionContext->allReferenceTables &=
|
relationRestrictionContext->allReferenceTables &=
|
||||||
IsCitusTableTypeCacheEntry(cacheEntry, REFERENCE_TABLE);
|
IsCitusTableTypeCacheEntry(cacheEntry, REFERENCE_TABLE);
|
||||||
}
|
}
|
||||||
|
|
|
@ -3692,7 +3692,7 @@ CopyRelationRestrictionContext(RelationRestrictionContext *oldContext)
|
||||||
|
|
||||||
newRestriction->index = oldRestriction->index;
|
newRestriction->index = oldRestriction->index;
|
||||||
newRestriction->relationId = oldRestriction->relationId;
|
newRestriction->relationId = oldRestriction->relationId;
|
||||||
newRestriction->distributedRelation = oldRestriction->distributedRelation;
|
newRestriction->citusTable = oldRestriction->citusTable;
|
||||||
newRestriction->rte = copyObject(oldRestriction->rte);
|
newRestriction->rte = copyObject(oldRestriction->rte);
|
||||||
|
|
||||||
/* can't be copied, we copy (flatly) a RelOptInfo, and then decouple baserestrictinfo */
|
/* can't be copied, we copy (flatly) a RelOptInfo, and then decouple baserestrictinfo */
|
||||||
|
|
|
@ -224,7 +224,7 @@ ContextContainsLocalRelation(RelationRestrictionContext *restrictionContext)
|
||||||
{
|
{
|
||||||
RelationRestriction *relationRestriction = lfirst(relationRestrictionCell);
|
RelationRestriction *relationRestriction = lfirst(relationRestrictionCell);
|
||||||
|
|
||||||
if (!relationRestriction->distributedRelation)
|
if (!relationRestriction->citusTable)
|
||||||
{
|
{
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
|
@ -56,7 +56,7 @@ typedef struct RelationRestriction
|
||||||
{
|
{
|
||||||
Index index;
|
Index index;
|
||||||
Oid relationId;
|
Oid relationId;
|
||||||
bool distributedRelation;
|
bool citusTable;
|
||||||
RangeTblEntry *rte;
|
RangeTblEntry *rte;
|
||||||
RelOptInfo *relOptInfo;
|
RelOptInfo *relOptInfo;
|
||||||
PlannerInfo *plannerInfo;
|
PlannerInfo *plannerInfo;
|
||||||
|
|
|
@ -521,9 +521,9 @@ SELECT tablename, indexname FROM pg_indexes WHERE schemaname = 'fix_idx_names' O
|
||||||
tablename | indexname
|
tablename | indexname
|
||||||
---------------------------------------------------------------------
|
---------------------------------------------------------------------
|
||||||
date_partitioned_citus_local_table | date_partitioned_citus_local_table_measureid_idx
|
date_partitioned_citus_local_table | date_partitioned_citus_local_table_measureid_idx
|
||||||
date_partitioned_citus_local_table_361369 | date_partitioned_citus_local_table_measureid_idx_361369
|
date_partitioned_citus_local_table_361377 | date_partitioned_citus_local_table_measureid_idx_361377
|
||||||
partition_local_table | partition_local_table_measureid_idx
|
partition_local_table | partition_local_table_measureid_idx
|
||||||
partition_local_table_361370 | partition_local_table_measureid_idx_361370
|
partition_local_table_361378 | partition_local_table_measureid_idx_361378
|
||||||
(4 rows)
|
(4 rows)
|
||||||
|
|
||||||
-- creating a single object should only need to trigger fixing the single object
|
-- creating a single object should only need to trigger fixing the single object
|
||||||
|
@ -753,7 +753,7 @@ DETAIL: drop cascades to table not_partitioned
|
||||||
drop cascades to table not_distributed
|
drop cascades to table not_distributed
|
||||||
drop cascades to table fk_table
|
drop cascades to table fk_table
|
||||||
drop cascades to table p
|
drop cascades to table p
|
||||||
drop cascades to table date_partitioned_citus_local_table_361369
|
drop cascades to table date_partitioned_citus_local_table_361377
|
||||||
drop cascades to table date_partitioned_citus_local_table
|
drop cascades to table date_partitioned_citus_local_table
|
||||||
drop cascades to table parent_table
|
drop cascades to table parent_table
|
||||||
SELECT citus_remove_node('localhost', :master_port);
|
SELECT citus_remove_node('localhost', :master_port);
|
||||||
|
|
|
@ -4324,12 +4324,66 @@ WHERE schemaname = 'partitioning_schema' AND tablename ilike '%part_table_with_%
|
||||||
(6 rows)
|
(6 rows)
|
||||||
|
|
||||||
\c - - - :master_port
|
\c - - - :master_port
|
||||||
|
SET search_path TO partitioning_schema;
|
||||||
|
-- create parent table
|
||||||
|
CREATE TABLE stxdinp(i int, a int, b int) PARTITION BY RANGE (i);
|
||||||
|
-- create partition
|
||||||
|
CREATE TABLE stxdinp1 PARTITION OF stxdinp FOR VALUES FROM (1) TO (100);
|
||||||
|
-- populate table
|
||||||
|
INSERT INTO stxdinp SELECT 1, a/100, a/100 FROM generate_series(1, 999) a;
|
||||||
|
-- create extended statistics
|
||||||
|
CREATE STATISTICS stxdinp ON a, b FROM stxdinp;
|
||||||
|
-- distribute parent table
|
||||||
|
SELECT create_distributed_table('stxdinp', 'i');
|
||||||
|
NOTICE: Copying data from local table...
|
||||||
|
NOTICE: copying the data has completed
|
||||||
|
DETAIL: The local data in the table is no longer visible, but is still on disk.
|
||||||
|
HINT: To remove the local data, run: SELECT truncate_local_data_after_distributing_table($$partitioning_schema.stxdinp1$$)
|
||||||
|
create_distributed_table
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- run select query, works fine
|
||||||
|
SELECT a, b FROM stxdinp GROUP BY 1, 2;
|
||||||
|
a | b
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
1 | 1
|
||||||
|
3 | 3
|
||||||
|
7 | 7
|
||||||
|
2 | 2
|
||||||
|
8 | 8
|
||||||
|
0 | 0
|
||||||
|
5 | 5
|
||||||
|
6 | 6
|
||||||
|
9 | 9
|
||||||
|
4 | 4
|
||||||
|
(10 rows)
|
||||||
|
|
||||||
|
-- partitions are processed recursively for PG15+
|
||||||
|
VACUUM ANALYZE stxdinp;
|
||||||
|
SELECT a, b FROM stxdinp GROUP BY 1, 2;
|
||||||
|
a | b
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
1 | 1
|
||||||
|
3 | 3
|
||||||
|
7 | 7
|
||||||
|
2 | 2
|
||||||
|
8 | 8
|
||||||
|
0 | 0
|
||||||
|
5 | 5
|
||||||
|
6 | 6
|
||||||
|
9 | 9
|
||||||
|
4 | 4
|
||||||
|
(10 rows)
|
||||||
|
|
||||||
DROP SCHEMA partitioning_schema CASCADE;
|
DROP SCHEMA partitioning_schema CASCADE;
|
||||||
NOTICE: drop cascades to 4 other objects
|
NOTICE: drop cascades to 5 other objects
|
||||||
DETAIL: drop cascades to table partitioning_schema."schema-test"
|
DETAIL: drop cascades to table "schema-test"
|
||||||
drop cascades to table partitioning_schema.another_distributed_table
|
drop cascades to table another_distributed_table
|
||||||
drop cascades to table partitioning_schema.distributed_parent_table
|
drop cascades to table distributed_parent_table
|
||||||
drop cascades to table partitioning_schema.part_table_with_very_long_name
|
drop cascades to table part_table_with_very_long_name
|
||||||
|
drop cascades to table stxdinp
|
||||||
RESET search_path;
|
RESET search_path;
|
||||||
DROP TABLE IF EXISTS
|
DROP TABLE IF EXISTS
|
||||||
partitioning_hash_test,
|
partitioning_hash_test,
|
||||||
|
|
|
@ -2002,6 +2002,30 @@ SELECT tablename, indexname FROM pg_indexes
|
||||||
WHERE schemaname = 'partitioning_schema' AND tablename ilike '%part_table_with_%' ORDER BY 1, 2;
|
WHERE schemaname = 'partitioning_schema' AND tablename ilike '%part_table_with_%' ORDER BY 1, 2;
|
||||||
|
|
||||||
\c - - - :master_port
|
\c - - - :master_port
|
||||||
|
SET search_path TO partitioning_schema;
|
||||||
|
|
||||||
|
-- create parent table
|
||||||
|
CREATE TABLE stxdinp(i int, a int, b int) PARTITION BY RANGE (i);
|
||||||
|
|
||||||
|
-- create partition
|
||||||
|
CREATE TABLE stxdinp1 PARTITION OF stxdinp FOR VALUES FROM (1) TO (100);
|
||||||
|
|
||||||
|
-- populate table
|
||||||
|
INSERT INTO stxdinp SELECT 1, a/100, a/100 FROM generate_series(1, 999) a;
|
||||||
|
|
||||||
|
-- create extended statistics
|
||||||
|
CREATE STATISTICS stxdinp ON a, b FROM stxdinp;
|
||||||
|
|
||||||
|
-- distribute parent table
|
||||||
|
SELECT create_distributed_table('stxdinp', 'i');
|
||||||
|
|
||||||
|
-- run select query, works fine
|
||||||
|
SELECT a, b FROM stxdinp GROUP BY 1, 2;
|
||||||
|
|
||||||
|
-- partitions are processed recursively for PG15+
|
||||||
|
VACUUM ANALYZE stxdinp;
|
||||||
|
SELECT a, b FROM stxdinp GROUP BY 1, 2;
|
||||||
|
|
||||||
DROP SCHEMA partitioning_schema CASCADE;
|
DROP SCHEMA partitioning_schema CASCADE;
|
||||||
RESET search_path;
|
RESET search_path;
|
||||||
DROP TABLE IF EXISTS
|
DROP TABLE IF EXISTS
|
||||||
|
|
Loading…
Reference in New Issue