From 766f340ce05d369e956dfe165c033e92fd81dd44 Mon Sep 17 00:00:00 2001 From: Onder Kalaci Date: Wed, 14 Sep 2022 15:59:49 +0300 Subject: [PATCH] Prevent failures on partitioned distributed tables with statistics objects on PG 15 Comment from the code is clear on this: /* * The statistics objects of the distributed table are not relevant * for the distributed planning, so we can override it. * * Normally, we should not need this. However, the combination of * Postgres commit 269b532aef55a579ae02a3e8e8df14101570dfd9 and * Citus function AdjustPartitioningForDistributedPlanning() * forces us to do this. The commit expects statistics objects * of partitions to have "inh" flag set properly. Whereas, the * function overrides "inh" flag. To avoid Postgres to throw error, * we override statlist such that Postgres does not try to process * any statistics objects during the standard_planner() on the * coordinator. In the end, we do not need the standard_planner() * on the coordinator to generate an optimized plan. We call * into standard_planner() for other purposes, such as generating the * relationRestrictionContext here. * * AdjustPartitioningForDistributedPlanning() is a hack that we use * to prevent Postgres' standard_planner() to expand all the partitions * for the distributed planning when a distributed partitioned table * is queried. It is required for both correctness and performance * reasons. Although we can eliminate the use of the function for * the correctness (e.g., make sure that rest of the planner can handle * partitions), it's performance implication is hard to avoid. Certain * planning logic of Citus (such as router or query pushdown) relies * heavily on the relationRestrictionList. If * AdjustPartitioningForDistributedPlanning() is removed, all the * partitions show up in the, causing high planning times for * such queries. */ --- .../distributed/planner/distributed_planner.c | 38 ++++++++++- .../planner/multi_router_planner.c | 2 +- .../relation_restriction_equivalence.c | 2 +- src/include/distributed/distributed_planner.h | 2 +- .../multi_fix_partition_shard_index_names.out | 6 +- .../regress/expected/multi_partitioning.out | 64 +++++++++++++++++-- src/test/regress/sql/multi_partitioning.sql | 24 +++++++ 7 files changed, 124 insertions(+), 14 deletions(-) diff --git a/src/backend/distributed/planner/distributed_planner.c b/src/backend/distributed/planner/distributed_planner.c index 3f431f514..c6713399b 100644 --- a/src/backend/distributed/planner/distributed_planner.c +++ b/src/backend/distributed/planner/distributed_planner.c @@ -1897,14 +1897,14 @@ multi_relation_restriction_hook(PlannerInfo *root, RelOptInfo *relOptInfo, MemoryContext restrictionsMemoryContext = plannerRestrictionContext->memoryContext; MemoryContext oldMemoryContext = MemoryContextSwitchTo(restrictionsMemoryContext); - bool distributedTable = IsCitusTable(rte->relid); + bool isCitusTable = IsCitusTable(rte->relid); RelationRestriction *relationRestriction = palloc0(sizeof(RelationRestriction)); relationRestriction->index = restrictionIndex; relationRestriction->relationId = rte->relid; relationRestriction->rte = rte; relationRestriction->relOptInfo = relOptInfo; - relationRestriction->distributedRelation = distributedTable; + relationRestriction->citusTable = isCitusTable; relationRestriction->plannerInfo = root; /* see comments on GetVarFromAssignedParam() */ @@ -1919,10 +1919,42 @@ multi_relation_restriction_hook(PlannerInfo *root, RelOptInfo *relOptInfo, * We're also keeping track of whether all participant * tables are reference tables. */ - if (distributedTable) + if (isCitusTable) { cacheEntry = GetCitusTableCacheEntry(rte->relid); + /* + * The statistics objects of the distributed table are not relevant + * for the distributed planning, so we can override it. + * + * Normally, we should not need this. However, the combination of + * Postgres commit 269b532aef55a579ae02a3e8e8df14101570dfd9 and + * Citus function AdjustPartitioningForDistributedPlanning() + * forces us to do this. The commit expects statistics objects + * of partitions to have "inh" flag set properly. Whereas, the + * function overrides "inh" flag. To avoid Postgres to throw error, + * we override statlist such that Postgres does not try to process + * any statistics objects during the standard_planner() on the + * coordinator. In the end, we do not need the standard_planner() + * on the coordinator to generate an optimized plan. We call + * into standard_planner() for other purposes, such as generating the + * relationRestrictionContext here. + * + * AdjustPartitioningForDistributedPlanning() is a hack that we use + * to prevent Postgres' standard_planner() to expand all the partitions + * for the distributed planning when a distributed partitioned table + * is queried. It is required for both correctness and performance + * reasons. Although we can eliminate the use of the function for + * the correctness (e.g., make sure that rest of the planner can handle + * partitions), it's performance implication is hard to avoid. Certain + * planning logic of Citus (such as router or query pushdown) relies + * heavily on the relationRestrictionList. If + * AdjustPartitioningForDistributedPlanning() is removed, all the + * partitions show up in the, causing high planning times for + * such queries. + */ + relOptInfo->statlist = NIL; + relationRestrictionContext->allReferenceTables &= IsCitusTableTypeCacheEntry(cacheEntry, REFERENCE_TABLE); } diff --git a/src/backend/distributed/planner/multi_router_planner.c b/src/backend/distributed/planner/multi_router_planner.c index 7335d1c2d..325c382c7 100644 --- a/src/backend/distributed/planner/multi_router_planner.c +++ b/src/backend/distributed/planner/multi_router_planner.c @@ -3692,7 +3692,7 @@ CopyRelationRestrictionContext(RelationRestrictionContext *oldContext) newRestriction->index = oldRestriction->index; newRestriction->relationId = oldRestriction->relationId; - newRestriction->distributedRelation = oldRestriction->distributedRelation; + newRestriction->citusTable = oldRestriction->citusTable; newRestriction->rte = copyObject(oldRestriction->rte); /* can't be copied, we copy (flatly) a RelOptInfo, and then decouple baserestrictinfo */ diff --git a/src/backend/distributed/planner/relation_restriction_equivalence.c b/src/backend/distributed/planner/relation_restriction_equivalence.c index ed0dd8195..ff6b90dc2 100644 --- a/src/backend/distributed/planner/relation_restriction_equivalence.c +++ b/src/backend/distributed/planner/relation_restriction_equivalence.c @@ -224,7 +224,7 @@ ContextContainsLocalRelation(RelationRestrictionContext *restrictionContext) { RelationRestriction *relationRestriction = lfirst(relationRestrictionCell); - if (!relationRestriction->distributedRelation) + if (!relationRestriction->citusTable) { return true; } diff --git a/src/include/distributed/distributed_planner.h b/src/include/distributed/distributed_planner.h index ab1787fbf..31de463f0 100644 --- a/src/include/distributed/distributed_planner.h +++ b/src/include/distributed/distributed_planner.h @@ -56,7 +56,7 @@ typedef struct RelationRestriction { Index index; Oid relationId; - bool distributedRelation; + bool citusTable; RangeTblEntry *rte; RelOptInfo *relOptInfo; PlannerInfo *plannerInfo; diff --git a/src/test/regress/expected/multi_fix_partition_shard_index_names.out b/src/test/regress/expected/multi_fix_partition_shard_index_names.out index 99f603541..2c7331274 100644 --- a/src/test/regress/expected/multi_fix_partition_shard_index_names.out +++ b/src/test/regress/expected/multi_fix_partition_shard_index_names.out @@ -521,9 +521,9 @@ SELECT tablename, indexname FROM pg_indexes WHERE schemaname = 'fix_idx_names' O tablename | indexname --------------------------------------------------------------------- date_partitioned_citus_local_table | date_partitioned_citus_local_table_measureid_idx - date_partitioned_citus_local_table_361369 | date_partitioned_citus_local_table_measureid_idx_361369 + date_partitioned_citus_local_table_361377 | date_partitioned_citus_local_table_measureid_idx_361377 partition_local_table | partition_local_table_measureid_idx - partition_local_table_361370 | partition_local_table_measureid_idx_361370 + partition_local_table_361378 | partition_local_table_measureid_idx_361378 (4 rows) -- creating a single object should only need to trigger fixing the single object @@ -753,7 +753,7 @@ DETAIL: drop cascades to table not_partitioned drop cascades to table not_distributed drop cascades to table fk_table drop cascades to table p -drop cascades to table date_partitioned_citus_local_table_361369 +drop cascades to table date_partitioned_citus_local_table_361377 drop cascades to table date_partitioned_citus_local_table drop cascades to table parent_table SELECT citus_remove_node('localhost', :master_port); diff --git a/src/test/regress/expected/multi_partitioning.out b/src/test/regress/expected/multi_partitioning.out index 6c8365f85..fa462aaaf 100644 --- a/src/test/regress/expected/multi_partitioning.out +++ b/src/test/regress/expected/multi_partitioning.out @@ -4324,12 +4324,66 @@ WHERE schemaname = 'partitioning_schema' AND tablename ilike '%part_table_with_% (6 rows) \c - - - :master_port +SET search_path TO partitioning_schema; +-- create parent table +CREATE TABLE stxdinp(i int, a int, b int) PARTITION BY RANGE (i); +-- create partition +CREATE TABLE stxdinp1 PARTITION OF stxdinp FOR VALUES FROM (1) TO (100); +-- populate table +INSERT INTO stxdinp SELECT 1, a/100, a/100 FROM generate_series(1, 999) a; +-- create extended statistics +CREATE STATISTICS stxdinp ON a, b FROM stxdinp; +-- distribute parent table +SELECT create_distributed_table('stxdinp', 'i'); +NOTICE: Copying data from local table... +NOTICE: copying the data has completed +DETAIL: The local data in the table is no longer visible, but is still on disk. +HINT: To remove the local data, run: SELECT truncate_local_data_after_distributing_table($$partitioning_schema.stxdinp1$$) + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +-- run select query, works fine +SELECT a, b FROM stxdinp GROUP BY 1, 2; + a | b +--------------------------------------------------------------------- + 1 | 1 + 3 | 3 + 7 | 7 + 2 | 2 + 8 | 8 + 0 | 0 + 5 | 5 + 6 | 6 + 9 | 9 + 4 | 4 +(10 rows) + +-- partitions are processed recursively for PG15+ +VACUUM ANALYZE stxdinp; +SELECT a, b FROM stxdinp GROUP BY 1, 2; + a | b +--------------------------------------------------------------------- + 1 | 1 + 3 | 3 + 7 | 7 + 2 | 2 + 8 | 8 + 0 | 0 + 5 | 5 + 6 | 6 + 9 | 9 + 4 | 4 +(10 rows) + DROP SCHEMA partitioning_schema CASCADE; -NOTICE: drop cascades to 4 other objects -DETAIL: drop cascades to table partitioning_schema."schema-test" -drop cascades to table partitioning_schema.another_distributed_table -drop cascades to table partitioning_schema.distributed_parent_table -drop cascades to table partitioning_schema.part_table_with_very_long_name +NOTICE: drop cascades to 5 other objects +DETAIL: drop cascades to table "schema-test" +drop cascades to table another_distributed_table +drop cascades to table distributed_parent_table +drop cascades to table part_table_with_very_long_name +drop cascades to table stxdinp RESET search_path; DROP TABLE IF EXISTS partitioning_hash_test, diff --git a/src/test/regress/sql/multi_partitioning.sql b/src/test/regress/sql/multi_partitioning.sql index cbfa20440..85a3ece66 100644 --- a/src/test/regress/sql/multi_partitioning.sql +++ b/src/test/regress/sql/multi_partitioning.sql @@ -2002,6 +2002,30 @@ SELECT tablename, indexname FROM pg_indexes WHERE schemaname = 'partitioning_schema' AND tablename ilike '%part_table_with_%' ORDER BY 1, 2; \c - - - :master_port +SET search_path TO partitioning_schema; + +-- create parent table +CREATE TABLE stxdinp(i int, a int, b int) PARTITION BY RANGE (i); + +-- create partition +CREATE TABLE stxdinp1 PARTITION OF stxdinp FOR VALUES FROM (1) TO (100); + +-- populate table +INSERT INTO stxdinp SELECT 1, a/100, a/100 FROM generate_series(1, 999) a; + +-- create extended statistics +CREATE STATISTICS stxdinp ON a, b FROM stxdinp; + +-- distribute parent table +SELECT create_distributed_table('stxdinp', 'i'); + +-- run select query, works fine +SELECT a, b FROM stxdinp GROUP BY 1, 2; + +-- partitions are processed recursively for PG15+ +VACUUM ANALYZE stxdinp; +SELECT a, b FROM stxdinp GROUP BY 1, 2; + DROP SCHEMA partitioning_schema CASCADE; RESET search_path; DROP TABLE IF EXISTS