mirror of https://github.com/citusdata/citus.git
Fix PG upgrades when invalid rebalance strategies exist (#7580)
DESCRIPTION: Fix PG upgrades when invalid rebalance strategies exist Without this change an upgrade of a cluster with an invalid rebalance strategy would fail with an error like this: ``` cache lookup failed for shard_cost_function with oid 6077337 CONTEXT: SQL statement "SELECT citus_validate_rebalance_strategy_functions( NEW.shard_cost_function, NEW.node_capacity_function, NEW.shard_allowed_on_node_function)" PL/pgSQL function citus_internal.pg_dist_rebalance_strategy_trigger_func() line 5 at PERFORM SQL statement "INSERT INTO pg_catalog.pg_dist_rebalance_strategy SELECT name, default_strategy, shard_cost_function::regprocedure::regproc, node_capacity_function::regprocedure::regproc, shard_allowed_on_node_function::regprocedure::regproc, default_threshold, minimum_threshold, improvement_threshold FROM public.pg_dist_rebalance_strategy" PL/pgSQL function citus_finish_pg_upgrade() line 115 at SQL statement ``` This fixes that by disabling the trigger and simply re-inserting the invalid rebalance strategy without checking. We could also silently remove it, but this seems nicer.pull/7578/head^2
parent
16604a6601
commit
110b4192b2
|
@ -54,3 +54,4 @@ DROP VIEW IF EXISTS pg_catalog.citus_lock_waits;
|
|||
#include "udfs/citus_internal_update_placement_metadata/12.2-1.sql"
|
||||
#include "udfs/citus_internal_update_relation_colocation/12.2-1.sql"
|
||||
#include "udfs/repl_origin_helper/12.2-1.sql"
|
||||
#include "udfs/citus_finish_pg_upgrade/12.2-1.sql"
|
||||
|
|
|
@ -54,3 +54,4 @@ DROP FUNCTION citus_internal.update_relation_colocation(oid, int);
|
|||
DROP FUNCTION citus_internal.start_replication_origin_tracking();
|
||||
DROP FUNCTION citus_internal.stop_replication_origin_tracking();
|
||||
DROP FUNCTION citus_internal.is_replication_origin_tracking_active();
|
||||
#include "../udfs/citus_finish_pg_upgrade/12.1-1.sql"
|
||||
|
|
|
@ -0,0 +1,227 @@
|
|||
CREATE OR REPLACE FUNCTION pg_catalog.citus_finish_pg_upgrade()
|
||||
RETURNS void
|
||||
LANGUAGE plpgsql
|
||||
SET search_path = pg_catalog
|
||||
AS $cppu$
|
||||
DECLARE
|
||||
table_name regclass;
|
||||
command text;
|
||||
trigger_name text;
|
||||
BEGIN
|
||||
|
||||
|
||||
IF substring(current_Setting('server_version'), '\d+')::int >= 14 THEN
|
||||
EXECUTE $cmd$
|
||||
-- disable propagation to prevent EnsureCoordinator errors
|
||||
-- the aggregate created here does not depend on Citus extension (yet)
|
||||
-- since we add the dependency with the next command
|
||||
SET citus.enable_ddl_propagation TO OFF;
|
||||
CREATE AGGREGATE array_cat_agg(anycompatiblearray) (SFUNC = array_cat, STYPE = anycompatiblearray);
|
||||
COMMENT ON AGGREGATE array_cat_agg(anycompatiblearray)
|
||||
IS 'concatenate input arrays into a single array';
|
||||
RESET citus.enable_ddl_propagation;
|
||||
$cmd$;
|
||||
ELSE
|
||||
EXECUTE $cmd$
|
||||
SET citus.enable_ddl_propagation TO OFF;
|
||||
CREATE AGGREGATE array_cat_agg(anyarray) (SFUNC = array_cat, STYPE = anyarray);
|
||||
COMMENT ON AGGREGATE array_cat_agg(anyarray)
|
||||
IS 'concatenate input arrays into a single array';
|
||||
RESET citus.enable_ddl_propagation;
|
||||
$cmd$;
|
||||
END IF;
|
||||
|
||||
--
|
||||
-- Citus creates the array_cat_agg but because of a compatibility
|
||||
-- issue between pg13-pg14, we drop and create it during upgrade.
|
||||
-- And as Citus creates it, there needs to be a dependency to the
|
||||
-- Citus extension, so we create that dependency here.
|
||||
-- We are not using:
|
||||
-- ALTER EXENSION citus DROP/CREATE AGGREGATE array_cat_agg
|
||||
-- because we don't have an easy way to check if the aggregate
|
||||
-- exists with anyarray type or anycompatiblearray type.
|
||||
|
||||
INSERT INTO pg_depend
|
||||
SELECT
|
||||
'pg_proc'::regclass::oid as classid,
|
||||
(SELECT oid FROM pg_proc WHERE proname = 'array_cat_agg') as objid,
|
||||
0 as objsubid,
|
||||
'pg_extension'::regclass::oid as refclassid,
|
||||
(select oid from pg_extension where extname = 'citus') as refobjid,
|
||||
0 as refobjsubid ,
|
||||
'e' as deptype;
|
||||
|
||||
-- PG16 has its own any_value, so only create it pre PG16.
|
||||
-- We can remove this part when we drop support for PG16
|
||||
IF substring(current_Setting('server_version'), '\d+')::int < 16 THEN
|
||||
EXECUTE $cmd$
|
||||
-- disable propagation to prevent EnsureCoordinator errors
|
||||
-- the aggregate created here does not depend on Citus extension (yet)
|
||||
-- since we add the dependency with the next command
|
||||
SET citus.enable_ddl_propagation TO OFF;
|
||||
CREATE OR REPLACE FUNCTION pg_catalog.any_value_agg ( anyelement, anyelement )
|
||||
RETURNS anyelement AS $$
|
||||
SELECT CASE WHEN $1 IS NULL THEN $2 ELSE $1 END;
|
||||
$$ LANGUAGE SQL STABLE;
|
||||
|
||||
CREATE AGGREGATE pg_catalog.any_value (
|
||||
sfunc = pg_catalog.any_value_agg,
|
||||
combinefunc = pg_catalog.any_value_agg,
|
||||
basetype = anyelement,
|
||||
stype = anyelement
|
||||
);
|
||||
COMMENT ON AGGREGATE pg_catalog.any_value(anyelement) IS
|
||||
'Returns the value of any row in the group. It is mostly useful when you know there will be only 1 element.';
|
||||
RESET citus.enable_ddl_propagation;
|
||||
--
|
||||
-- Citus creates the any_value aggregate but because of a compatibility
|
||||
-- issue between pg15-pg16 -- any_value is created in PG16, we drop
|
||||
-- and create it during upgrade IF upgraded version is less than 16.
|
||||
-- And as Citus creates it, there needs to be a dependency to the
|
||||
-- Citus extension, so we create that dependency here.
|
||||
|
||||
INSERT INTO pg_depend
|
||||
SELECT
|
||||
'pg_proc'::regclass::oid as classid,
|
||||
(SELECT oid FROM pg_proc WHERE proname = 'any_value_agg') as objid,
|
||||
0 as objsubid,
|
||||
'pg_extension'::regclass::oid as refclassid,
|
||||
(select oid from pg_extension where extname = 'citus') as refobjid,
|
||||
0 as refobjsubid ,
|
||||
'e' as deptype;
|
||||
|
||||
INSERT INTO pg_depend
|
||||
SELECT
|
||||
'pg_proc'::regclass::oid as classid,
|
||||
(SELECT oid FROM pg_proc WHERE proname = 'any_value') as objid,
|
||||
0 as objsubid,
|
||||
'pg_extension'::regclass::oid as refclassid,
|
||||
(select oid from pg_extension where extname = 'citus') as refobjid,
|
||||
0 as refobjsubid ,
|
||||
'e' as deptype;
|
||||
$cmd$;
|
||||
END IF;
|
||||
|
||||
--
|
||||
-- restore citus catalog tables
|
||||
--
|
||||
INSERT INTO pg_catalog.pg_dist_partition SELECT * FROM public.pg_dist_partition;
|
||||
|
||||
-- if we are upgrading from PG14/PG15 to PG16+,
|
||||
-- we need to regenerate the partkeys because they will include varnullingrels as well.
|
||||
UPDATE pg_catalog.pg_dist_partition
|
||||
SET partkey = column_name_to_column(pg_dist_partkeys_pre_16_upgrade.logicalrelid, col_name)
|
||||
FROM public.pg_dist_partkeys_pre_16_upgrade
|
||||
WHERE pg_dist_partkeys_pre_16_upgrade.logicalrelid = pg_dist_partition.logicalrelid;
|
||||
DROP TABLE public.pg_dist_partkeys_pre_16_upgrade;
|
||||
|
||||
INSERT INTO pg_catalog.pg_dist_shard SELECT * FROM public.pg_dist_shard;
|
||||
INSERT INTO pg_catalog.pg_dist_placement SELECT * FROM public.pg_dist_placement;
|
||||
INSERT INTO pg_catalog.pg_dist_node_metadata SELECT * FROM public.pg_dist_node_metadata;
|
||||
INSERT INTO pg_catalog.pg_dist_node SELECT * FROM public.pg_dist_node;
|
||||
INSERT INTO pg_catalog.pg_dist_local_group SELECT * FROM public.pg_dist_local_group;
|
||||
INSERT INTO pg_catalog.pg_dist_transaction SELECT * FROM public.pg_dist_transaction;
|
||||
INSERT INTO pg_catalog.pg_dist_colocation SELECT * FROM public.pg_dist_colocation;
|
||||
INSERT INTO pg_catalog.pg_dist_cleanup SELECT * FROM public.pg_dist_cleanup;
|
||||
INSERT INTO pg_catalog.pg_dist_schema SELECT schemaname::regnamespace, colocationid FROM public.pg_dist_schema;
|
||||
-- enterprise catalog tables
|
||||
INSERT INTO pg_catalog.pg_dist_authinfo SELECT * FROM public.pg_dist_authinfo;
|
||||
INSERT INTO pg_catalog.pg_dist_poolinfo SELECT * FROM public.pg_dist_poolinfo;
|
||||
|
||||
-- Temporarily disable trigger to check for validity of functions while
|
||||
-- inserting. The current contents of the table might be invalid if one of
|
||||
-- the functions was removed by the user without also removing the
|
||||
-- rebalance strategy. Obviously that's not great, but it should be no
|
||||
-- reason to fail the upgrade.
|
||||
ALTER TABLE pg_catalog.pg_dist_rebalance_strategy DISABLE TRIGGER pg_dist_rebalance_strategy_validation_trigger;
|
||||
INSERT INTO pg_catalog.pg_dist_rebalance_strategy SELECT
|
||||
name,
|
||||
default_strategy,
|
||||
shard_cost_function::regprocedure::regproc,
|
||||
node_capacity_function::regprocedure::regproc,
|
||||
shard_allowed_on_node_function::regprocedure::regproc,
|
||||
default_threshold,
|
||||
minimum_threshold,
|
||||
improvement_threshold
|
||||
FROM public.pg_dist_rebalance_strategy;
|
||||
ALTER TABLE pg_catalog.pg_dist_rebalance_strategy ENABLE TRIGGER pg_dist_rebalance_strategy_validation_trigger;
|
||||
|
||||
--
|
||||
-- drop backup tables
|
||||
--
|
||||
DROP TABLE public.pg_dist_authinfo;
|
||||
DROP TABLE public.pg_dist_colocation;
|
||||
DROP TABLE public.pg_dist_local_group;
|
||||
DROP TABLE public.pg_dist_node;
|
||||
DROP TABLE public.pg_dist_node_metadata;
|
||||
DROP TABLE public.pg_dist_partition;
|
||||
DROP TABLE public.pg_dist_placement;
|
||||
DROP TABLE public.pg_dist_poolinfo;
|
||||
DROP TABLE public.pg_dist_shard;
|
||||
DROP TABLE public.pg_dist_transaction;
|
||||
DROP TABLE public.pg_dist_rebalance_strategy;
|
||||
DROP TABLE public.pg_dist_cleanup;
|
||||
DROP TABLE public.pg_dist_schema;
|
||||
--
|
||||
-- reset sequences
|
||||
--
|
||||
PERFORM setval('pg_catalog.pg_dist_shardid_seq', (SELECT MAX(shardid)+1 AS max_shard_id FROM pg_dist_shard), false);
|
||||
PERFORM setval('pg_catalog.pg_dist_placement_placementid_seq', (SELECT MAX(placementid)+1 AS max_placement_id FROM pg_dist_placement), false);
|
||||
PERFORM setval('pg_catalog.pg_dist_groupid_seq', (SELECT MAX(groupid)+1 AS max_group_id FROM pg_dist_node), false);
|
||||
PERFORM setval('pg_catalog.pg_dist_node_nodeid_seq', (SELECT MAX(nodeid)+1 AS max_node_id FROM pg_dist_node), false);
|
||||
PERFORM setval('pg_catalog.pg_dist_colocationid_seq', (SELECT MAX(colocationid)+1 AS max_colocation_id FROM pg_dist_colocation), false);
|
||||
PERFORM setval('pg_catalog.pg_dist_operationid_seq', (SELECT MAX(operation_id)+1 AS max_operation_id FROM pg_dist_cleanup), false);
|
||||
PERFORM setval('pg_catalog.pg_dist_cleanup_recordid_seq', (SELECT MAX(record_id)+1 AS max_record_id FROM pg_dist_cleanup), false);
|
||||
PERFORM setval('pg_catalog.pg_dist_clock_logical_seq', (SELECT last_value FROM public.pg_dist_clock_logical_seq), false);
|
||||
DROP TABLE public.pg_dist_clock_logical_seq;
|
||||
|
||||
|
||||
|
||||
--
|
||||
-- register triggers
|
||||
--
|
||||
FOR table_name IN SELECT logicalrelid FROM pg_catalog.pg_dist_partition JOIN pg_class ON (logicalrelid = oid) WHERE relkind <> 'f'
|
||||
LOOP
|
||||
trigger_name := 'truncate_trigger_' || table_name::oid;
|
||||
command := 'create trigger ' || trigger_name || ' after truncate on ' || table_name || ' execute procedure pg_catalog.citus_truncate_trigger()';
|
||||
EXECUTE command;
|
||||
command := 'update pg_trigger set tgisinternal = true where tgname = ' || quote_literal(trigger_name);
|
||||
EXECUTE command;
|
||||
END LOOP;
|
||||
|
||||
--
|
||||
-- set dependencies
|
||||
--
|
||||
INSERT INTO pg_depend
|
||||
SELECT
|
||||
'pg_class'::regclass::oid as classid,
|
||||
p.logicalrelid::regclass::oid as objid,
|
||||
0 as objsubid,
|
||||
'pg_extension'::regclass::oid as refclassid,
|
||||
(select oid from pg_extension where extname = 'citus') as refobjid,
|
||||
0 as refobjsubid ,
|
||||
'n' as deptype
|
||||
FROM pg_catalog.pg_dist_partition p;
|
||||
|
||||
-- set dependencies for columnar table access method
|
||||
PERFORM columnar_internal.columnar_ensure_am_depends_catalog();
|
||||
|
||||
-- restore pg_dist_object from the stable identifiers
|
||||
TRUNCATE pg_catalog.pg_dist_object;
|
||||
INSERT INTO pg_catalog.pg_dist_object (classid, objid, objsubid, distribution_argument_index, colocationid)
|
||||
SELECT
|
||||
address.classid,
|
||||
address.objid,
|
||||
address.objsubid,
|
||||
naming.distribution_argument_index,
|
||||
naming.colocationid
|
||||
FROM
|
||||
public.pg_dist_object naming,
|
||||
pg_catalog.pg_get_object_address(naming.type, naming.object_names, naming.object_args) address;
|
||||
|
||||
DROP TABLE public.pg_dist_object;
|
||||
END;
|
||||
$cppu$;
|
||||
|
||||
COMMENT ON FUNCTION pg_catalog.citus_finish_pg_upgrade()
|
||||
IS 'perform tasks to restore citus settings from a location that has been prepared before pg_upgrade';
|
|
@ -128,6 +128,12 @@ BEGIN
|
|||
INSERT INTO pg_catalog.pg_dist_authinfo SELECT * FROM public.pg_dist_authinfo;
|
||||
INSERT INTO pg_catalog.pg_dist_poolinfo SELECT * FROM public.pg_dist_poolinfo;
|
||||
|
||||
-- Temporarily disable trigger to check for validity of functions while
|
||||
-- inserting. The current contents of the table might be invalid if one of
|
||||
-- the functions was removed by the user without also removing the
|
||||
-- rebalance strategy. Obviously that's not great, but it should be no
|
||||
-- reason to fail the upgrade.
|
||||
ALTER TABLE pg_catalog.pg_dist_rebalance_strategy DISABLE TRIGGER pg_dist_rebalance_strategy_validation_trigger;
|
||||
INSERT INTO pg_catalog.pg_dist_rebalance_strategy SELECT
|
||||
name,
|
||||
default_strategy,
|
||||
|
@ -138,6 +144,7 @@ BEGIN
|
|||
minimum_threshold,
|
||||
improvement_threshold
|
||||
FROM public.pg_dist_rebalance_strategy;
|
||||
ALTER TABLE pg_catalog.pg_dist_rebalance_strategy ENABLE TRIGGER pg_dist_rebalance_strategy_validation_trigger;
|
||||
|
||||
--
|
||||
-- drop backup tables
|
||||
|
|
|
@ -1,8 +1,9 @@
|
|||
SELECT * FROM pg_catalog.pg_dist_rebalance_strategy ORDER BY name;
|
||||
name | default_strategy | shard_cost_function | node_capacity_function | shard_allowed_on_node_function | default_threshold | minimum_threshold | improvement_threshold
|
||||
name | default_strategy | shard_cost_function | node_capacity_function | shard_allowed_on_node_function | default_threshold | minimum_threshold | improvement_threshold
|
||||
---------------------------------------------------------------------
|
||||
by_disk_size | f | citus_shard_cost_by_disk_size | citus_node_capacity_1 | citus_shard_allowed_on_node_true | 0.1 | 0.01 | 0.5
|
||||
by_shard_count | f | citus_shard_cost_1 | citus_node_capacity_1 | citus_shard_allowed_on_node_true | 0 | 0 | 0
|
||||
custom_strategy | t | upgrade_rebalance_strategy.shard_cost_2 | upgrade_rebalance_strategy.capacity_high_worker_1 | upgrade_rebalance_strategy.only_worker_2 | 0.5 | 0.2 | 0.3
|
||||
(3 rows)
|
||||
by_disk_size | f | citus_shard_cost_by_disk_size | citus_node_capacity_1 | citus_shard_allowed_on_node_true | 0.1 | 0.01 | 0.5
|
||||
by_shard_count | f | citus_shard_cost_1 | citus_node_capacity_1 | citus_shard_allowed_on_node_true | 0 | 0 | 0
|
||||
custom_strategy | t | upgrade_rebalance_strategy.shard_cost_2 | upgrade_rebalance_strategy.capacity_high_worker_1 | upgrade_rebalance_strategy.only_worker_2 | 0.5 | 0.2 | 0.3
|
||||
invalid_strategy | f | 1234567 | upgrade_rebalance_strategy.capacity_high_worker_1 | upgrade_rebalance_strategy.only_worker_2 | 0.5 | 0.2 | 0.3
|
||||
(4 rows)
|
||||
|
||||
|
|
|
@ -35,3 +35,23 @@ SELECT citus_set_default_rebalance_strategy('custom_strategy');
|
|||
|
||||
(1 row)
|
||||
|
||||
-- Disable the trigger temporarily to allow the invalid strategy to be added.
|
||||
-- Normally an invalid strategy can end up in the table by deleting one of the
|
||||
-- functions it depends on. But we do directly in this test because we want to
|
||||
-- have a consistent OID, so we get consistent test output.
|
||||
ALTER TABLE pg_catalog.pg_dist_rebalance_strategy DISABLE TRIGGER pg_dist_rebalance_strategy_validation_trigger;
|
||||
SELECT citus_add_rebalance_strategy(
|
||||
'invalid_strategy',
|
||||
1234567,
|
||||
'capacity_high_worker_1',
|
||||
'only_worker_2',
|
||||
0.5,
|
||||
0.2,
|
||||
0.3
|
||||
);
|
||||
citus_add_rebalance_strategy
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
ALTER TABLE pg_catalog.pg_dist_rebalance_strategy ENABLE TRIGGER pg_dist_rebalance_strategy_validation_trigger;
|
||||
|
|
|
@ -29,3 +29,19 @@ SELECT citus_add_rebalance_strategy(
|
|||
0.3
|
||||
);
|
||||
SELECT citus_set_default_rebalance_strategy('custom_strategy');
|
||||
|
||||
-- Disable the trigger temporarily to allow the invalid strategy to be added.
|
||||
-- Normally an invalid strategy can end up in the table by deleting one of the
|
||||
-- functions it depends on. But we do directly in this test because we want to
|
||||
-- have a consistent OID, so we get consistent test output.
|
||||
ALTER TABLE pg_catalog.pg_dist_rebalance_strategy DISABLE TRIGGER pg_dist_rebalance_strategy_validation_trigger;
|
||||
SELECT citus_add_rebalance_strategy(
|
||||
'invalid_strategy',
|
||||
1234567,
|
||||
'capacity_high_worker_1',
|
||||
'only_worker_2',
|
||||
0.5,
|
||||
0.2,
|
||||
0.3
|
||||
);
|
||||
ALTER TABLE pg_catalog.pg_dist_rebalance_strategy ENABLE TRIGGER pg_dist_rebalance_strategy_validation_trigger;
|
||||
|
|
Loading…
Reference in New Issue