From 110b4192b2277144fb6716dc5323874f52255e20 Mon Sep 17 00:00:00 2001 From: Jelte Fennema-Nio Date: Mon, 15 Apr 2024 16:26:33 +0200 Subject: [PATCH] Fix PG upgrades when invalid rebalance strategies exist (#7580) DESCRIPTION: Fix PG upgrades when invalid rebalance strategies exist Without this change an upgrade of a cluster with an invalid rebalance strategy would fail with an error like this: ``` cache lookup failed for shard_cost_function with oid 6077337 CONTEXT: SQL statement "SELECT citus_validate_rebalance_strategy_functions( NEW.shard_cost_function, NEW.node_capacity_function, NEW.shard_allowed_on_node_function)" PL/pgSQL function citus_internal.pg_dist_rebalance_strategy_trigger_func() line 5 at PERFORM SQL statement "INSERT INTO pg_catalog.pg_dist_rebalance_strategy SELECT name, default_strategy, shard_cost_function::regprocedure::regproc, node_capacity_function::regprocedure::regproc, shard_allowed_on_node_function::regprocedure::regproc, default_threshold, minimum_threshold, improvement_threshold FROM public.pg_dist_rebalance_strategy" PL/pgSQL function citus_finish_pg_upgrade() line 115 at SQL statement ``` This fixes that by disabling the trigger and simply re-inserting the invalid rebalance strategy without checking. We could also silently remove it, but this seems nicer. --- .../distributed/sql/citus--12.1-1--12.2-1.sql | 1 + .../sql/downgrades/citus--12.2-1--12.1-1.sql | 1 + .../udfs/citus_finish_pg_upgrade/12.2-1.sql | 227 ++++++++++++++++++ .../udfs/citus_finish_pg_upgrade/latest.sql | 7 + .../upgrade_rebalance_strategy_after.out | 11 +- .../upgrade_rebalance_strategy_before.out | 20 ++ .../sql/upgrade_rebalance_strategy_before.sql | 16 ++ 7 files changed, 278 insertions(+), 5 deletions(-) create mode 100644 src/backend/distributed/sql/udfs/citus_finish_pg_upgrade/12.2-1.sql diff --git a/src/backend/distributed/sql/citus--12.1-1--12.2-1.sql b/src/backend/distributed/sql/citus--12.1-1--12.2-1.sql index 2d5f88676..1bec0f429 100644 --- a/src/backend/distributed/sql/citus--12.1-1--12.2-1.sql +++ b/src/backend/distributed/sql/citus--12.1-1--12.2-1.sql @@ -54,3 +54,4 @@ DROP VIEW IF EXISTS pg_catalog.citus_lock_waits; #include "udfs/citus_internal_update_placement_metadata/12.2-1.sql" #include "udfs/citus_internal_update_relation_colocation/12.2-1.sql" #include "udfs/repl_origin_helper/12.2-1.sql" +#include "udfs/citus_finish_pg_upgrade/12.2-1.sql" diff --git a/src/backend/distributed/sql/downgrades/citus--12.2-1--12.1-1.sql b/src/backend/distributed/sql/downgrades/citus--12.2-1--12.1-1.sql index 581c65ea8..099bf8d87 100644 --- a/src/backend/distributed/sql/downgrades/citus--12.2-1--12.1-1.sql +++ b/src/backend/distributed/sql/downgrades/citus--12.2-1--12.1-1.sql @@ -54,3 +54,4 @@ DROP FUNCTION citus_internal.update_relation_colocation(oid, int); DROP FUNCTION citus_internal.start_replication_origin_tracking(); DROP FUNCTION citus_internal.stop_replication_origin_tracking(); DROP FUNCTION citus_internal.is_replication_origin_tracking_active(); +#include "../udfs/citus_finish_pg_upgrade/12.1-1.sql" diff --git a/src/backend/distributed/sql/udfs/citus_finish_pg_upgrade/12.2-1.sql b/src/backend/distributed/sql/udfs/citus_finish_pg_upgrade/12.2-1.sql new file mode 100644 index 000000000..4d3a17bd4 --- /dev/null +++ b/src/backend/distributed/sql/udfs/citus_finish_pg_upgrade/12.2-1.sql @@ -0,0 +1,227 @@ +CREATE OR REPLACE FUNCTION pg_catalog.citus_finish_pg_upgrade() + RETURNS void + LANGUAGE plpgsql + SET search_path = pg_catalog + AS $cppu$ +DECLARE + table_name regclass; + command text; + trigger_name text; +BEGIN + + + IF substring(current_Setting('server_version'), '\d+')::int >= 14 THEN + EXECUTE $cmd$ + -- disable propagation to prevent EnsureCoordinator errors + -- the aggregate created here does not depend on Citus extension (yet) + -- since we add the dependency with the next command + SET citus.enable_ddl_propagation TO OFF; + CREATE AGGREGATE array_cat_agg(anycompatiblearray) (SFUNC = array_cat, STYPE = anycompatiblearray); + COMMENT ON AGGREGATE array_cat_agg(anycompatiblearray) + IS 'concatenate input arrays into a single array'; + RESET citus.enable_ddl_propagation; + $cmd$; + ELSE + EXECUTE $cmd$ + SET citus.enable_ddl_propagation TO OFF; + CREATE AGGREGATE array_cat_agg(anyarray) (SFUNC = array_cat, STYPE = anyarray); + COMMENT ON AGGREGATE array_cat_agg(anyarray) + IS 'concatenate input arrays into a single array'; + RESET citus.enable_ddl_propagation; + $cmd$; + END IF; + + -- + -- Citus creates the array_cat_agg but because of a compatibility + -- issue between pg13-pg14, we drop and create it during upgrade. + -- And as Citus creates it, there needs to be a dependency to the + -- Citus extension, so we create that dependency here. + -- We are not using: + -- ALTER EXENSION citus DROP/CREATE AGGREGATE array_cat_agg + -- because we don't have an easy way to check if the aggregate + -- exists with anyarray type or anycompatiblearray type. + + INSERT INTO pg_depend + SELECT + 'pg_proc'::regclass::oid as classid, + (SELECT oid FROM pg_proc WHERE proname = 'array_cat_agg') as objid, + 0 as objsubid, + 'pg_extension'::regclass::oid as refclassid, + (select oid from pg_extension where extname = 'citus') as refobjid, + 0 as refobjsubid , + 'e' as deptype; + + -- PG16 has its own any_value, so only create it pre PG16. + -- We can remove this part when we drop support for PG16 + IF substring(current_Setting('server_version'), '\d+')::int < 16 THEN + EXECUTE $cmd$ + -- disable propagation to prevent EnsureCoordinator errors + -- the aggregate created here does not depend on Citus extension (yet) + -- since we add the dependency with the next command + SET citus.enable_ddl_propagation TO OFF; + CREATE OR REPLACE FUNCTION pg_catalog.any_value_agg ( anyelement, anyelement ) + RETURNS anyelement AS $$ + SELECT CASE WHEN $1 IS NULL THEN $2 ELSE $1 END; + $$ LANGUAGE SQL STABLE; + + CREATE AGGREGATE pg_catalog.any_value ( + sfunc = pg_catalog.any_value_agg, + combinefunc = pg_catalog.any_value_agg, + basetype = anyelement, + stype = anyelement + ); + COMMENT ON AGGREGATE pg_catalog.any_value(anyelement) IS + 'Returns the value of any row in the group. It is mostly useful when you know there will be only 1 element.'; + RESET citus.enable_ddl_propagation; + -- + -- Citus creates the any_value aggregate but because of a compatibility + -- issue between pg15-pg16 -- any_value is created in PG16, we drop + -- and create it during upgrade IF upgraded version is less than 16. + -- And as Citus creates it, there needs to be a dependency to the + -- Citus extension, so we create that dependency here. + + INSERT INTO pg_depend + SELECT + 'pg_proc'::regclass::oid as classid, + (SELECT oid FROM pg_proc WHERE proname = 'any_value_agg') as objid, + 0 as objsubid, + 'pg_extension'::regclass::oid as refclassid, + (select oid from pg_extension where extname = 'citus') as refobjid, + 0 as refobjsubid , + 'e' as deptype; + + INSERT INTO pg_depend + SELECT + 'pg_proc'::regclass::oid as classid, + (SELECT oid FROM pg_proc WHERE proname = 'any_value') as objid, + 0 as objsubid, + 'pg_extension'::regclass::oid as refclassid, + (select oid from pg_extension where extname = 'citus') as refobjid, + 0 as refobjsubid , + 'e' as deptype; + $cmd$; + END IF; + + -- + -- restore citus catalog tables + -- + INSERT INTO pg_catalog.pg_dist_partition SELECT * FROM public.pg_dist_partition; + + -- if we are upgrading from PG14/PG15 to PG16+, + -- we need to regenerate the partkeys because they will include varnullingrels as well. + UPDATE pg_catalog.pg_dist_partition + SET partkey = column_name_to_column(pg_dist_partkeys_pre_16_upgrade.logicalrelid, col_name) + FROM public.pg_dist_partkeys_pre_16_upgrade + WHERE pg_dist_partkeys_pre_16_upgrade.logicalrelid = pg_dist_partition.logicalrelid; + DROP TABLE public.pg_dist_partkeys_pre_16_upgrade; + + INSERT INTO pg_catalog.pg_dist_shard SELECT * FROM public.pg_dist_shard; + INSERT INTO pg_catalog.pg_dist_placement SELECT * FROM public.pg_dist_placement; + INSERT INTO pg_catalog.pg_dist_node_metadata SELECT * FROM public.pg_dist_node_metadata; + INSERT INTO pg_catalog.pg_dist_node SELECT * FROM public.pg_dist_node; + INSERT INTO pg_catalog.pg_dist_local_group SELECT * FROM public.pg_dist_local_group; + INSERT INTO pg_catalog.pg_dist_transaction SELECT * FROM public.pg_dist_transaction; + INSERT INTO pg_catalog.pg_dist_colocation SELECT * FROM public.pg_dist_colocation; + INSERT INTO pg_catalog.pg_dist_cleanup SELECT * FROM public.pg_dist_cleanup; + INSERT INTO pg_catalog.pg_dist_schema SELECT schemaname::regnamespace, colocationid FROM public.pg_dist_schema; + -- enterprise catalog tables + INSERT INTO pg_catalog.pg_dist_authinfo SELECT * FROM public.pg_dist_authinfo; + INSERT INTO pg_catalog.pg_dist_poolinfo SELECT * FROM public.pg_dist_poolinfo; + + -- Temporarily disable trigger to check for validity of functions while + -- inserting. The current contents of the table might be invalid if one of + -- the functions was removed by the user without also removing the + -- rebalance strategy. Obviously that's not great, but it should be no + -- reason to fail the upgrade. + ALTER TABLE pg_catalog.pg_dist_rebalance_strategy DISABLE TRIGGER pg_dist_rebalance_strategy_validation_trigger; + INSERT INTO pg_catalog.pg_dist_rebalance_strategy SELECT + name, + default_strategy, + shard_cost_function::regprocedure::regproc, + node_capacity_function::regprocedure::regproc, + shard_allowed_on_node_function::regprocedure::regproc, + default_threshold, + minimum_threshold, + improvement_threshold + FROM public.pg_dist_rebalance_strategy; + ALTER TABLE pg_catalog.pg_dist_rebalance_strategy ENABLE TRIGGER pg_dist_rebalance_strategy_validation_trigger; + + -- + -- drop backup tables + -- + DROP TABLE public.pg_dist_authinfo; + DROP TABLE public.pg_dist_colocation; + DROP TABLE public.pg_dist_local_group; + DROP TABLE public.pg_dist_node; + DROP TABLE public.pg_dist_node_metadata; + DROP TABLE public.pg_dist_partition; + DROP TABLE public.pg_dist_placement; + DROP TABLE public.pg_dist_poolinfo; + DROP TABLE public.pg_dist_shard; + DROP TABLE public.pg_dist_transaction; + DROP TABLE public.pg_dist_rebalance_strategy; + DROP TABLE public.pg_dist_cleanup; + DROP TABLE public.pg_dist_schema; + -- + -- reset sequences + -- + PERFORM setval('pg_catalog.pg_dist_shardid_seq', (SELECT MAX(shardid)+1 AS max_shard_id FROM pg_dist_shard), false); + PERFORM setval('pg_catalog.pg_dist_placement_placementid_seq', (SELECT MAX(placementid)+1 AS max_placement_id FROM pg_dist_placement), false); + PERFORM setval('pg_catalog.pg_dist_groupid_seq', (SELECT MAX(groupid)+1 AS max_group_id FROM pg_dist_node), false); + PERFORM setval('pg_catalog.pg_dist_node_nodeid_seq', (SELECT MAX(nodeid)+1 AS max_node_id FROM pg_dist_node), false); + PERFORM setval('pg_catalog.pg_dist_colocationid_seq', (SELECT MAX(colocationid)+1 AS max_colocation_id FROM pg_dist_colocation), false); + PERFORM setval('pg_catalog.pg_dist_operationid_seq', (SELECT MAX(operation_id)+1 AS max_operation_id FROM pg_dist_cleanup), false); + PERFORM setval('pg_catalog.pg_dist_cleanup_recordid_seq', (SELECT MAX(record_id)+1 AS max_record_id FROM pg_dist_cleanup), false); + PERFORM setval('pg_catalog.pg_dist_clock_logical_seq', (SELECT last_value FROM public.pg_dist_clock_logical_seq), false); + DROP TABLE public.pg_dist_clock_logical_seq; + + + + -- + -- register triggers + -- + FOR table_name IN SELECT logicalrelid FROM pg_catalog.pg_dist_partition JOIN pg_class ON (logicalrelid = oid) WHERE relkind <> 'f' + LOOP + trigger_name := 'truncate_trigger_' || table_name::oid; + command := 'create trigger ' || trigger_name || ' after truncate on ' || table_name || ' execute procedure pg_catalog.citus_truncate_trigger()'; + EXECUTE command; + command := 'update pg_trigger set tgisinternal = true where tgname = ' || quote_literal(trigger_name); + EXECUTE command; + END LOOP; + + -- + -- set dependencies + -- + INSERT INTO pg_depend + SELECT + 'pg_class'::regclass::oid as classid, + p.logicalrelid::regclass::oid as objid, + 0 as objsubid, + 'pg_extension'::regclass::oid as refclassid, + (select oid from pg_extension where extname = 'citus') as refobjid, + 0 as refobjsubid , + 'n' as deptype + FROM pg_catalog.pg_dist_partition p; + + -- set dependencies for columnar table access method + PERFORM columnar_internal.columnar_ensure_am_depends_catalog(); + + -- restore pg_dist_object from the stable identifiers + TRUNCATE pg_catalog.pg_dist_object; + INSERT INTO pg_catalog.pg_dist_object (classid, objid, objsubid, distribution_argument_index, colocationid) + SELECT + address.classid, + address.objid, + address.objsubid, + naming.distribution_argument_index, + naming.colocationid + FROM + public.pg_dist_object naming, + pg_catalog.pg_get_object_address(naming.type, naming.object_names, naming.object_args) address; + + DROP TABLE public.pg_dist_object; +END; +$cppu$; + +COMMENT ON FUNCTION pg_catalog.citus_finish_pg_upgrade() + IS 'perform tasks to restore citus settings from a location that has been prepared before pg_upgrade'; diff --git a/src/backend/distributed/sql/udfs/citus_finish_pg_upgrade/latest.sql b/src/backend/distributed/sql/udfs/citus_finish_pg_upgrade/latest.sql index 766e86a2e..4d3a17bd4 100644 --- a/src/backend/distributed/sql/udfs/citus_finish_pg_upgrade/latest.sql +++ b/src/backend/distributed/sql/udfs/citus_finish_pg_upgrade/latest.sql @@ -128,6 +128,12 @@ BEGIN INSERT INTO pg_catalog.pg_dist_authinfo SELECT * FROM public.pg_dist_authinfo; INSERT INTO pg_catalog.pg_dist_poolinfo SELECT * FROM public.pg_dist_poolinfo; + -- Temporarily disable trigger to check for validity of functions while + -- inserting. The current contents of the table might be invalid if one of + -- the functions was removed by the user without also removing the + -- rebalance strategy. Obviously that's not great, but it should be no + -- reason to fail the upgrade. + ALTER TABLE pg_catalog.pg_dist_rebalance_strategy DISABLE TRIGGER pg_dist_rebalance_strategy_validation_trigger; INSERT INTO pg_catalog.pg_dist_rebalance_strategy SELECT name, default_strategy, @@ -138,6 +144,7 @@ BEGIN minimum_threshold, improvement_threshold FROM public.pg_dist_rebalance_strategy; + ALTER TABLE pg_catalog.pg_dist_rebalance_strategy ENABLE TRIGGER pg_dist_rebalance_strategy_validation_trigger; -- -- drop backup tables diff --git a/src/test/regress/expected/upgrade_rebalance_strategy_after.out b/src/test/regress/expected/upgrade_rebalance_strategy_after.out index 4036af539..c7ea5cc4e 100644 --- a/src/test/regress/expected/upgrade_rebalance_strategy_after.out +++ b/src/test/regress/expected/upgrade_rebalance_strategy_after.out @@ -1,8 +1,9 @@ SELECT * FROM pg_catalog.pg_dist_rebalance_strategy ORDER BY name; - name | default_strategy | shard_cost_function | node_capacity_function | shard_allowed_on_node_function | default_threshold | minimum_threshold | improvement_threshold + name | default_strategy | shard_cost_function | node_capacity_function | shard_allowed_on_node_function | default_threshold | minimum_threshold | improvement_threshold --------------------------------------------------------------------- - by_disk_size | f | citus_shard_cost_by_disk_size | citus_node_capacity_1 | citus_shard_allowed_on_node_true | 0.1 | 0.01 | 0.5 - by_shard_count | f | citus_shard_cost_1 | citus_node_capacity_1 | citus_shard_allowed_on_node_true | 0 | 0 | 0 - custom_strategy | t | upgrade_rebalance_strategy.shard_cost_2 | upgrade_rebalance_strategy.capacity_high_worker_1 | upgrade_rebalance_strategy.only_worker_2 | 0.5 | 0.2 | 0.3 -(3 rows) + by_disk_size | f | citus_shard_cost_by_disk_size | citus_node_capacity_1 | citus_shard_allowed_on_node_true | 0.1 | 0.01 | 0.5 + by_shard_count | f | citus_shard_cost_1 | citus_node_capacity_1 | citus_shard_allowed_on_node_true | 0 | 0 | 0 + custom_strategy | t | upgrade_rebalance_strategy.shard_cost_2 | upgrade_rebalance_strategy.capacity_high_worker_1 | upgrade_rebalance_strategy.only_worker_2 | 0.5 | 0.2 | 0.3 + invalid_strategy | f | 1234567 | upgrade_rebalance_strategy.capacity_high_worker_1 | upgrade_rebalance_strategy.only_worker_2 | 0.5 | 0.2 | 0.3 +(4 rows) diff --git a/src/test/regress/expected/upgrade_rebalance_strategy_before.out b/src/test/regress/expected/upgrade_rebalance_strategy_before.out index cf1d122b3..85b458389 100644 --- a/src/test/regress/expected/upgrade_rebalance_strategy_before.out +++ b/src/test/regress/expected/upgrade_rebalance_strategy_before.out @@ -35,3 +35,23 @@ SELECT citus_set_default_rebalance_strategy('custom_strategy'); (1 row) +-- Disable the trigger temporarily to allow the invalid strategy to be added. +-- Normally an invalid strategy can end up in the table by deleting one of the +-- functions it depends on. But we do directly in this test because we want to +-- have a consistent OID, so we get consistent test output. +ALTER TABLE pg_catalog.pg_dist_rebalance_strategy DISABLE TRIGGER pg_dist_rebalance_strategy_validation_trigger; +SELECT citus_add_rebalance_strategy( + 'invalid_strategy', + 1234567, + 'capacity_high_worker_1', + 'only_worker_2', + 0.5, + 0.2, + 0.3 + ); + citus_add_rebalance_strategy +--------------------------------------------------------------------- + +(1 row) + +ALTER TABLE pg_catalog.pg_dist_rebalance_strategy ENABLE TRIGGER pg_dist_rebalance_strategy_validation_trigger; diff --git a/src/test/regress/sql/upgrade_rebalance_strategy_before.sql b/src/test/regress/sql/upgrade_rebalance_strategy_before.sql index 458fb9cf6..be2012e9c 100644 --- a/src/test/regress/sql/upgrade_rebalance_strategy_before.sql +++ b/src/test/regress/sql/upgrade_rebalance_strategy_before.sql @@ -29,3 +29,19 @@ SELECT citus_add_rebalance_strategy( 0.3 ); SELECT citus_set_default_rebalance_strategy('custom_strategy'); + +-- Disable the trigger temporarily to allow the invalid strategy to be added. +-- Normally an invalid strategy can end up in the table by deleting one of the +-- functions it depends on. But we do directly in this test because we want to +-- have a consistent OID, so we get consistent test output. +ALTER TABLE pg_catalog.pg_dist_rebalance_strategy DISABLE TRIGGER pg_dist_rebalance_strategy_validation_trigger; +SELECT citus_add_rebalance_strategy( + 'invalid_strategy', + 1234567, + 'capacity_high_worker_1', + 'only_worker_2', + 0.5, + 0.2, + 0.3 + ); +ALTER TABLE pg_catalog.pg_dist_rebalance_strategy ENABLE TRIGGER pg_dist_rebalance_strategy_validation_trigger;