fix array_cat_agg for pg upgrades

array_cat_agg now needs to take anycompatiblearray instead of anyarray
because array_cat changed its type from anyarray to anycompatiblearray
with pg14.

To handle upgrades correctly, we drop the aggregate in
citus_pg_prepare_upgrade. To be able to drop it, we first remove the
dependency from pg_depend.

Then we create the right aggregate in citus_finish_pg_upgrade and we
also add the dependency back to pg_depend.
talha_pg14_support
Sait Talha Nisanci 2021-08-24 19:26:33 +03:00
parent 795a46f04c
commit cddb8663dd
8 changed files with 250 additions and 3 deletions

View File

@ -16,3 +16,5 @@ ALTER TABLE pg_catalog.pg_dist_placement ADD CONSTRAINT placement_shardid_groupi
#include "udfs/citus_internal_update_placement_metadata/10.2-1.sql";
#include "udfs/citus_internal_delete_shard_metadata/10.2-1.sql";
#include "udfs/citus_internal_update_relation_colocation/10.2-1.sql";
#include "udfs/citus_prepare_pg_upgrade/10.2-1.sql"
#include "udfs/citus_finish_pg_upgrade/10.2-1.sql"

View File

@ -0,0 +1,131 @@
CREATE OR REPLACE FUNCTION pg_catalog.citus_finish_pg_upgrade()
RETURNS void
LANGUAGE plpgsql
SET search_path = pg_catalog
AS $cppu$
DECLARE
table_name regclass;
command text;
trigger_name text;
BEGIN
IF substring(current_Setting('server_version'), '\d+')::int >= 14 THEN
EXECUTE $cmd$
CREATE AGGREGATE array_cat_agg(anycompatiblearray) (SFUNC = array_cat, STYPE = anycompatiblearray);
COMMENT ON AGGREGATE array_cat_agg(anycompatiblearray)
IS 'concatenate input arrays into a single array';
$cmd$;
ELSE
EXECUTE $cmd$
CREATE AGGREGATE array_cat_agg(anyarray) (SFUNC = array_cat, STYPE = anyarray);
COMMENT ON AGGREGATE array_cat_agg(anyarray)
IS 'concatenate input arrays into a single array';
$cmd$;
END IF;
INSERT INTO pg_depend
SELECT
'pg_proc'::regclass::oid as classid,
(SELECT oid FROM pg_proc WHERE proname = 'array_cat_agg') as objid,
0 as objsubid,
'pg_extension'::regclass::oid as refclassid,
(select oid from pg_extension where extname = 'citus') as refobjid,
0 as refobjsubid ,
'e' as deptype;
--
-- restore citus catalog tables
--
INSERT INTO pg_catalog.pg_dist_partition SELECT * FROM public.pg_dist_partition;
INSERT INTO pg_catalog.pg_dist_shard SELECT * FROM public.pg_dist_shard;
INSERT INTO pg_catalog.pg_dist_placement SELECT * FROM public.pg_dist_placement;
INSERT INTO pg_catalog.pg_dist_node_metadata SELECT * FROM public.pg_dist_node_metadata;
INSERT INTO pg_catalog.pg_dist_node SELECT * FROM public.pg_dist_node;
INSERT INTO pg_catalog.pg_dist_local_group SELECT * FROM public.pg_dist_local_group;
INSERT INTO pg_catalog.pg_dist_transaction SELECT * FROM public.pg_dist_transaction;
INSERT INTO pg_catalog.pg_dist_colocation SELECT * FROM public.pg_dist_colocation;
-- enterprise catalog tables
INSERT INTO pg_catalog.pg_dist_authinfo SELECT * FROM public.pg_dist_authinfo;
INSERT INTO pg_catalog.pg_dist_poolinfo SELECT * FROM public.pg_dist_poolinfo;
INSERT INTO pg_catalog.pg_dist_rebalance_strategy SELECT
name,
default_strategy,
shard_cost_function::regprocedure::regproc,
node_capacity_function::regprocedure::regproc,
shard_allowed_on_node_function::regprocedure::regproc,
default_threshold,
minimum_threshold,
improvement_threshold
FROM public.pg_dist_rebalance_strategy;
--
-- drop backup tables
--
DROP TABLE public.pg_dist_authinfo;
DROP TABLE public.pg_dist_colocation;
DROP TABLE public.pg_dist_local_group;
DROP TABLE public.pg_dist_node;
DROP TABLE public.pg_dist_node_metadata;
DROP TABLE public.pg_dist_partition;
DROP TABLE public.pg_dist_placement;
DROP TABLE public.pg_dist_poolinfo;
DROP TABLE public.pg_dist_shard;
DROP TABLE public.pg_dist_transaction;
DROP TABLE public.pg_dist_rebalance_strategy;
--
-- reset sequences
--
PERFORM setval('pg_catalog.pg_dist_shardid_seq', (SELECT MAX(shardid)+1 AS max_shard_id FROM pg_dist_shard), false);
PERFORM setval('pg_catalog.pg_dist_placement_placementid_seq', (SELECT MAX(placementid)+1 AS max_placement_id FROM pg_dist_placement), false);
PERFORM setval('pg_catalog.pg_dist_groupid_seq', (SELECT MAX(groupid)+1 AS max_group_id FROM pg_dist_node), false);
PERFORM setval('pg_catalog.pg_dist_node_nodeid_seq', (SELECT MAX(nodeid)+1 AS max_node_id FROM pg_dist_node), false);
PERFORM setval('pg_catalog.pg_dist_colocationid_seq', (SELECT MAX(colocationid)+1 AS max_colocation_id FROM pg_dist_colocation), false);
--
-- register triggers
--
FOR table_name IN SELECT logicalrelid FROM pg_catalog.pg_dist_partition
LOOP
trigger_name := 'truncate_trigger_' || table_name::oid;
command := 'create trigger ' || trigger_name || ' after truncate on ' || table_name || ' execute procedure pg_catalog.citus_truncate_trigger()';
EXECUTE command;
command := 'update pg_trigger set tgisinternal = true where tgname = ' || quote_literal(trigger_name);
EXECUTE command;
END LOOP;
--
-- set dependencies
--
INSERT INTO pg_depend
SELECT
'pg_class'::regclass::oid as classid,
p.logicalrelid::regclass::oid as objid,
0 as objsubid,
'pg_extension'::regclass::oid as refclassid,
(select oid from pg_extension where extname = 'citus') as refobjid,
0 as refobjsubid ,
'n' as deptype
FROM pg_catalog.pg_dist_partition p;
-- restore pg_dist_object from the stable identifiers
TRUNCATE citus.pg_dist_object;
INSERT INTO citus.pg_dist_object (classid, objid, objsubid, distribution_argument_index, colocationid)
SELECT
address.classid,
address.objid,
address.objsubid,
naming.distribution_argument_index,
naming.colocationid
FROM
public.pg_dist_object naming,
pg_catalog.pg_get_object_address(naming.type, naming.object_names, naming.object_args) address;
DROP TABLE public.pg_dist_object;
END;
$cppu$;
COMMENT ON FUNCTION pg_catalog.citus_finish_pg_upgrade()
IS 'perform tasks to restore citus settings from a location that has been prepared before pg_upgrade';

View File

@ -8,6 +8,32 @@ DECLARE
command text;
trigger_name text;
BEGIN
IF substring(current_Setting('server_version'), '\d+')::int >= 14 THEN
EXECUTE $cmd$
CREATE AGGREGATE array_cat_agg(anycompatiblearray) (SFUNC = array_cat, STYPE = anycompatiblearray);
COMMENT ON AGGREGATE array_cat_agg(anycompatiblearray)
IS 'concatenate input arrays into a single array';
$cmd$;
ELSE
EXECUTE $cmd$
CREATE AGGREGATE array_cat_agg(anyarray) (SFUNC = array_cat, STYPE = anyarray);
COMMENT ON AGGREGATE array_cat_agg(anyarray)
IS 'concatenate input arrays into a single array';
$cmd$;
END IF;
INSERT INTO pg_depend
SELECT
'pg_proc'::regclass::oid as classid,
(SELECT oid FROM pg_proc WHERE proname = 'array_cat_agg') as objid,
0 as objsubid,
'pg_extension'::regclass::oid as refclassid,
(select oid from pg_extension where extname = 'citus') as refobjid,
0 as refobjsubid ,
'e' as deptype;
--
-- restore citus catalog tables
--

View File

@ -0,0 +1,74 @@
CREATE OR REPLACE FUNCTION pg_catalog.citus_prepare_pg_upgrade()
RETURNS void
LANGUAGE plpgsql
SET search_path = pg_catalog
AS $cppu$
BEGIN
DELETE FROM pg_depend WHERE
objid IN (SELECT oid FROM pg_proc WHERE proname = 'array_cat_agg') AND
refobjid IN (select oid from pg_extension where extname = 'citus');
/*
* We are dropping the aggregates because postgres 14 changed
* array_cat type from anyarray to anycompatiblearray. When
* upgrading to pg14, spegifically when running pg_restore on
* array_cat_agg we would get an error. So we drop the aggregate
* and create the right one on citus_finish_pg_upgrade.
*/
DROP AGGREGATE IF EXISTS array_cat_agg(anyarray);
DROP AGGREGATE IF EXISTS array_cat_agg(anycompatiblearray);
--
-- Drop existing backup tables
--
DROP TABLE IF EXISTS public.pg_dist_partition;
DROP TABLE IF EXISTS public.pg_dist_shard;
DROP TABLE IF EXISTS public.pg_dist_placement;
DROP TABLE IF EXISTS public.pg_dist_node_metadata;
DROP TABLE IF EXISTS public.pg_dist_node;
DROP TABLE IF EXISTS public.pg_dist_local_group;
DROP TABLE IF EXISTS public.pg_dist_transaction;
DROP TABLE IF EXISTS public.pg_dist_colocation;
DROP TABLE IF EXISTS public.pg_dist_authinfo;
DROP TABLE IF EXISTS public.pg_dist_poolinfo;
DROP TABLE IF EXISTS public.pg_dist_rebalance_strategy;
DROP TABLE IF EXISTS public.pg_dist_object;
--
-- backup citus catalog tables
--
CREATE TABLE public.pg_dist_partition AS SELECT * FROM pg_catalog.pg_dist_partition;
CREATE TABLE public.pg_dist_shard AS SELECT * FROM pg_catalog.pg_dist_shard;
CREATE TABLE public.pg_dist_placement AS SELECT * FROM pg_catalog.pg_dist_placement;
CREATE TABLE public.pg_dist_node_metadata AS SELECT * FROM pg_catalog.pg_dist_node_metadata;
CREATE TABLE public.pg_dist_node AS SELECT * FROM pg_catalog.pg_dist_node;
CREATE TABLE public.pg_dist_local_group AS SELECT * FROM pg_catalog.pg_dist_local_group;
CREATE TABLE public.pg_dist_transaction AS SELECT * FROM pg_catalog.pg_dist_transaction;
CREATE TABLE public.pg_dist_colocation AS SELECT * FROM pg_catalog.pg_dist_colocation;
-- enterprise catalog tables
CREATE TABLE public.pg_dist_authinfo AS SELECT * FROM pg_catalog.pg_dist_authinfo;
CREATE TABLE public.pg_dist_poolinfo AS SELECT * FROM pg_catalog.pg_dist_poolinfo;
CREATE TABLE public.pg_dist_rebalance_strategy AS SELECT
name,
default_strategy,
shard_cost_function::regprocedure::text,
node_capacity_function::regprocedure::text,
shard_allowed_on_node_function::regprocedure::text,
default_threshold,
minimum_threshold,
improvement_threshold
FROM pg_catalog.pg_dist_rebalance_strategy;
-- store upgrade stable identifiers on pg_dist_object catalog
CREATE TABLE public.pg_dist_object AS SELECT
address.type,
address.object_names,
address.object_args,
objects.distribution_argument_index,
objects.colocationid
FROM citus.pg_dist_object objects,
pg_catalog.pg_identify_object_as_address(objects.classid, objects.objid, objects.objsubid) address;
END;
$cppu$;
COMMENT ON FUNCTION pg_catalog.citus_prepare_pg_upgrade()
IS 'perform tasks to copy citus settings to a location that could later be restored after pg_upgrade is done';

View File

@ -4,6 +4,19 @@ CREATE OR REPLACE FUNCTION pg_catalog.citus_prepare_pg_upgrade()
SET search_path = pg_catalog
AS $cppu$
BEGIN
DELETE FROM pg_depend WHERE
objid IN (SELECT oid FROM pg_proc WHERE proname = 'array_cat_agg') AND
refobjid IN (select oid from pg_extension where extname = 'citus');
/*
* We are dropping the aggregates because postgres 14 changed
* array_cat type from anyarray to anycompatiblearray. When
* upgrading to pg14, spegifically when running pg_restore on
* array_cat_agg we would get an error. So we drop the aggregate
* and create the right one on citus_finish_pg_upgrade.
*/
DROP AGGREGATE IF EXISTS array_cat_agg(anyarray);
DROP AGGREGATE IF EXISTS array_cat_agg(anycompatiblearray);
--
-- Drop existing backup tables
--

View File

@ -240,4 +240,5 @@ s/ERROR: ROLLBACK is not allowed in an SQL function/ERROR: ROLLBACK is not all
/Parent Relationship/d
/Parent-Relationship/d
s/function array_cat_agg\(anyarray\) anyarray/function array_cat_agg\(anycompatiblearray\) anycompatiblearray/g
s/TRIM\(BOTH FROM value\)/btrim\(value\)/g
s/TRIM\(BOTH FROM value\)/btrim\(value\)/g
s/function array_cat_agg\(anycompatiblearray\)/function array_cat_agg\(anyarray\)/g

View File

@ -26,7 +26,7 @@ ORDER BY 1;
function alter_table_set_access_method(regclass,text)
function any_value(anyelement)
function any_value_agg(anyelement,anyelement)
function array_cat_agg(anyarray)
function array_cat_agg(anycompatiblearray)
function assign_distributed_transaction_id(integer,bigint,timestamp with time zone)
function authinfo_valid(text)
function broadcast_intermediate_result(text,text)

View File

@ -23,7 +23,7 @@ ORDER BY 1;
function alter_table_set_access_method(regclass,text)
function any_value(anyelement)
function any_value_agg(anyelement,anyelement)
function array_cat_agg(anyarray)
function array_cat_agg(anycompatiblearray)
function assign_distributed_transaction_id(integer,bigint,timestamp with time zone)
function authinfo_valid(text)
function broadcast_intermediate_result(text,text)