diff --git a/src/backend/distributed/planner/multi_logical_optimizer.c b/src/backend/distributed/planner/multi_logical_optimizer.c index bd3739717..5d464f94a 100644 --- a/src/backend/distributed/planner/multi_logical_optimizer.c +++ b/src/backend/distributed/planner/multi_logical_optimizer.c @@ -1847,7 +1847,11 @@ MasterAggregateExpression(Aggref *originalAggregate, { /* array_cat_agg() takes anyarray as input */ catAggregateName = ARRAY_CAT_AGGREGATE_NAME; +#if PG_VERSION_NUM >= PG_VERSION_14 + catInputType = ANYCOMPATIBLEARRAYOID; +#else catInputType = ANYARRAYOID; +#endif } else if (aggregateType == AGGREGATE_JSONB_AGG || aggregateType == AGGREGATE_JSONB_OBJECT_AGG) @@ -1882,7 +1886,25 @@ MasterAggregateExpression(Aggref *originalAggregate, newMasterAggregate->args = list_make1(catAggArgument); newMasterAggregate->aggfilter = NULL; newMasterAggregate->aggtranstype = InvalidOid; - newMasterAggregate->aggargtypes = list_make1_oid(ANYARRAYOID); + + if (aggregateType == AGGREGATE_ARRAY_AGG) + { +#if PG_VERSION_NUM >= PG_VERSION_14 + /* + * Postgres expects the type of the array here such as INT4ARRAYOID. + * Hence we set it to workerReturnType. If we set this to + * ANYCOMPATIBLEARRAYOID then we will get the following error: + * "argument declared anycompatiblearray is not an array but type anycompatiblearray" + */ + newMasterAggregate->aggargtypes = list_make1_oid(workerReturnType); +#else + newMasterAggregate->aggargtypes = list_make1_oid(ANYARRAYOID); +#endif + } + else + { + newMasterAggregate->aggargtypes = list_make1_oid(ANYARRAYOID); + } newMasterAggregate->aggsplit = AGGSPLIT_SIMPLE; newMasterExpression = (Expr *) newMasterAggregate; diff --git a/src/backend/distributed/sql/citus--10.1-1--10.2-1.sql b/src/backend/distributed/sql/citus--10.1-1--10.2-1.sql index 7f0f8dfb0..ec5d162eb 100644 --- a/src/backend/distributed/sql/citus--10.1-1--10.2-1.sql +++ b/src/backend/distributed/sql/citus--10.1-1--10.2-1.sql @@ -28,3 +28,5 @@ CREATE FUNCTION pg_catalog.citus_drop_all_shards(logicalrelid regclass, COMMENT ON FUNCTION pg_catalog.citus_drop_all_shards(regclass, text, text, boolean) IS 'drop all shards in a relation and update metadata'; #include "udfs/citus_drop_trigger/10.2-1.sql"; +#include "udfs/citus_prepare_pg_upgrade/10.2-1.sql" +#include "udfs/citus_finish_pg_upgrade/10.2-1.sql" diff --git a/src/backend/distributed/sql/citus--8.0-1.sql b/src/backend/distributed/sql/citus--8.0-1.sql index 5647e01fd..e27c773d7 100644 --- a/src/backend/distributed/sql/citus--8.0-1.sql +++ b/src/backend/distributed/sql/citus--8.0-1.sql @@ -320,9 +320,23 @@ CREATE TRIGGER dist_shard_cache_invalidate -- Citus aggregates + +DO $proc$ +BEGIN +IF substring(current_Setting('server_version'), '\d+')::int >= 14 THEN + EXECUTE $$ +CREATE AGGREGATE array_cat_agg(anycompatiblearray) (SFUNC = array_cat, STYPE = anycompatiblearray); +COMMENT ON AGGREGATE array_cat_agg(anycompatiblearray) + IS 'concatenate input arrays into a single array'; + $$; +ELSE + EXECUTE $$ CREATE AGGREGATE array_cat_agg(anyarray) (SFUNC = array_cat, STYPE = anyarray); COMMENT ON AGGREGATE array_cat_agg(anyarray) IS 'concatenate input arrays into a single array'; + $$; +END IF; +END$proc$; GRANT SELECT ON pg_catalog.pg_dist_partition TO public; GRANT SELECT ON pg_catalog.pg_dist_shard TO public; diff --git a/src/backend/distributed/sql/udfs/citus_finish_pg_upgrade/10.2-1.sql b/src/backend/distributed/sql/udfs/citus_finish_pg_upgrade/10.2-1.sql new file mode 100644 index 000000000..ae3b0b900 --- /dev/null +++ b/src/backend/distributed/sql/udfs/citus_finish_pg_upgrade/10.2-1.sql @@ -0,0 +1,141 @@ +CREATE OR REPLACE FUNCTION pg_catalog.citus_finish_pg_upgrade() + RETURNS void + LANGUAGE plpgsql + SET search_path = pg_catalog + AS $cppu$ +DECLARE + table_name regclass; + command text; + trigger_name text; +BEGIN + + + IF substring(current_Setting('server_version'), '\d+')::int >= 14 THEN + EXECUTE $cmd$ + CREATE AGGREGATE array_cat_agg(anycompatiblearray) (SFUNC = array_cat, STYPE = anycompatiblearray); + COMMENT ON AGGREGATE array_cat_agg(anycompatiblearray) + IS 'concatenate input arrays into a single array'; + $cmd$; + ELSE + EXECUTE $cmd$ + CREATE AGGREGATE array_cat_agg(anyarray) (SFUNC = array_cat, STYPE = anyarray); + COMMENT ON AGGREGATE array_cat_agg(anyarray) + IS 'concatenate input arrays into a single array'; + $cmd$; + END IF; + + /* + * Citus creates the array_cat_agg but because of a compatibility + * issue between pg13-pg14, we drop and create it during upgrade. + * And as Citus creates it, there needs to be a dependency to the + * Citus extension, so we create that dependency here. + * We are not using: + * ALTER EXENSION citus DROP/CREATE AGGREGATE array_cat_agg + * because we don't have an easy way to check if the aggregate + * exists with anyarray type or anycompatiblearray type. + */ + INSERT INTO pg_depend + SELECT + 'pg_proc'::regclass::oid as classid, + (SELECT oid FROM pg_proc WHERE proname = 'array_cat_agg') as objid, + 0 as objsubid, + 'pg_extension'::regclass::oid as refclassid, + (select oid from pg_extension where extname = 'citus') as refobjid, + 0 as refobjsubid , + 'e' as deptype; + + -- + -- restore citus catalog tables + -- + INSERT INTO pg_catalog.pg_dist_partition SELECT * FROM public.pg_dist_partition; + INSERT INTO pg_catalog.pg_dist_shard SELECT * FROM public.pg_dist_shard; + INSERT INTO pg_catalog.pg_dist_placement SELECT * FROM public.pg_dist_placement; + INSERT INTO pg_catalog.pg_dist_node_metadata SELECT * FROM public.pg_dist_node_metadata; + INSERT INTO pg_catalog.pg_dist_node SELECT * FROM public.pg_dist_node; + INSERT INTO pg_catalog.pg_dist_local_group SELECT * FROM public.pg_dist_local_group; + INSERT INTO pg_catalog.pg_dist_transaction SELECT * FROM public.pg_dist_transaction; + INSERT INTO pg_catalog.pg_dist_colocation SELECT * FROM public.pg_dist_colocation; + -- enterprise catalog tables + INSERT INTO pg_catalog.pg_dist_authinfo SELECT * FROM public.pg_dist_authinfo; + INSERT INTO pg_catalog.pg_dist_poolinfo SELECT * FROM public.pg_dist_poolinfo; + + INSERT INTO pg_catalog.pg_dist_rebalance_strategy SELECT + name, + default_strategy, + shard_cost_function::regprocedure::regproc, + node_capacity_function::regprocedure::regproc, + shard_allowed_on_node_function::regprocedure::regproc, + default_threshold, + minimum_threshold, + improvement_threshold + FROM public.pg_dist_rebalance_strategy; + + -- + -- drop backup tables + -- + DROP TABLE public.pg_dist_authinfo; + DROP TABLE public.pg_dist_colocation; + DROP TABLE public.pg_dist_local_group; + DROP TABLE public.pg_dist_node; + DROP TABLE public.pg_dist_node_metadata; + DROP TABLE public.pg_dist_partition; + DROP TABLE public.pg_dist_placement; + DROP TABLE public.pg_dist_poolinfo; + DROP TABLE public.pg_dist_shard; + DROP TABLE public.pg_dist_transaction; + DROP TABLE public.pg_dist_rebalance_strategy; + + -- + -- reset sequences + -- + PERFORM setval('pg_catalog.pg_dist_shardid_seq', (SELECT MAX(shardid)+1 AS max_shard_id FROM pg_dist_shard), false); + PERFORM setval('pg_catalog.pg_dist_placement_placementid_seq', (SELECT MAX(placementid)+1 AS max_placement_id FROM pg_dist_placement), false); + PERFORM setval('pg_catalog.pg_dist_groupid_seq', (SELECT MAX(groupid)+1 AS max_group_id FROM pg_dist_node), false); + PERFORM setval('pg_catalog.pg_dist_node_nodeid_seq', (SELECT MAX(nodeid)+1 AS max_node_id FROM pg_dist_node), false); + PERFORM setval('pg_catalog.pg_dist_colocationid_seq', (SELECT MAX(colocationid)+1 AS max_colocation_id FROM pg_dist_colocation), false); + + -- + -- register triggers + -- + FOR table_name IN SELECT logicalrelid FROM pg_catalog.pg_dist_partition + LOOP + trigger_name := 'truncate_trigger_' || table_name::oid; + command := 'create trigger ' || trigger_name || ' after truncate on ' || table_name || ' execute procedure pg_catalog.citus_truncate_trigger()'; + EXECUTE command; + command := 'update pg_trigger set tgisinternal = true where tgname = ' || quote_literal(trigger_name); + EXECUTE command; + END LOOP; + + -- + -- set dependencies + -- + INSERT INTO pg_depend + SELECT + 'pg_class'::regclass::oid as classid, + p.logicalrelid::regclass::oid as objid, + 0 as objsubid, + 'pg_extension'::regclass::oid as refclassid, + (select oid from pg_extension where extname = 'citus') as refobjid, + 0 as refobjsubid , + 'n' as deptype + FROM pg_catalog.pg_dist_partition p; + + -- restore pg_dist_object from the stable identifiers + TRUNCATE citus.pg_dist_object; + INSERT INTO citus.pg_dist_object (classid, objid, objsubid, distribution_argument_index, colocationid) + SELECT + address.classid, + address.objid, + address.objsubid, + naming.distribution_argument_index, + naming.colocationid + FROM + public.pg_dist_object naming, + pg_catalog.pg_get_object_address(naming.type, naming.object_names, naming.object_args) address; + + DROP TABLE public.pg_dist_object; +END; +$cppu$; + +COMMENT ON FUNCTION pg_catalog.citus_finish_pg_upgrade() + IS 'perform tasks to restore citus settings from a location that has been prepared before pg_upgrade'; diff --git a/src/backend/distributed/sql/udfs/citus_finish_pg_upgrade/latest.sql b/src/backend/distributed/sql/udfs/citus_finish_pg_upgrade/latest.sql index 5902b646f..ae3b0b900 100644 --- a/src/backend/distributed/sql/udfs/citus_finish_pg_upgrade/latest.sql +++ b/src/backend/distributed/sql/udfs/citus_finish_pg_upgrade/latest.sql @@ -8,6 +8,42 @@ DECLARE command text; trigger_name text; BEGIN + + + IF substring(current_Setting('server_version'), '\d+')::int >= 14 THEN + EXECUTE $cmd$ + CREATE AGGREGATE array_cat_agg(anycompatiblearray) (SFUNC = array_cat, STYPE = anycompatiblearray); + COMMENT ON AGGREGATE array_cat_agg(anycompatiblearray) + IS 'concatenate input arrays into a single array'; + $cmd$; + ELSE + EXECUTE $cmd$ + CREATE AGGREGATE array_cat_agg(anyarray) (SFUNC = array_cat, STYPE = anyarray); + COMMENT ON AGGREGATE array_cat_agg(anyarray) + IS 'concatenate input arrays into a single array'; + $cmd$; + END IF; + + /* + * Citus creates the array_cat_agg but because of a compatibility + * issue between pg13-pg14, we drop and create it during upgrade. + * And as Citus creates it, there needs to be a dependency to the + * Citus extension, so we create that dependency here. + * We are not using: + * ALTER EXENSION citus DROP/CREATE AGGREGATE array_cat_agg + * because we don't have an easy way to check if the aggregate + * exists with anyarray type or anycompatiblearray type. + */ + INSERT INTO pg_depend + SELECT + 'pg_proc'::regclass::oid as classid, + (SELECT oid FROM pg_proc WHERE proname = 'array_cat_agg') as objid, + 0 as objsubid, + 'pg_extension'::regclass::oid as refclassid, + (select oid from pg_extension where extname = 'citus') as refobjid, + 0 as refobjsubid , + 'e' as deptype; + -- -- restore citus catalog tables -- diff --git a/src/backend/distributed/sql/udfs/citus_prepare_pg_upgrade/10.2-1.sql b/src/backend/distributed/sql/udfs/citus_prepare_pg_upgrade/10.2-1.sql new file mode 100644 index 000000000..8b5e13bc0 --- /dev/null +++ b/src/backend/distributed/sql/udfs/citus_prepare_pg_upgrade/10.2-1.sql @@ -0,0 +1,74 @@ +CREATE OR REPLACE FUNCTION pg_catalog.citus_prepare_pg_upgrade() + RETURNS void + LANGUAGE plpgsql + SET search_path = pg_catalog + AS $cppu$ +BEGIN + + DELETE FROM pg_depend WHERE + objid IN (SELECT oid FROM pg_proc WHERE proname = 'array_cat_agg') AND + refobjid IN (select oid from pg_extension where extname = 'citus'); + /* + * We are dropping the aggregates because postgres 14 changed + * array_cat type from anyarray to anycompatiblearray. When + * upgrading to pg14, spegifically when running pg_restore on + * array_cat_agg we would get an error. So we drop the aggregate + * and create the right one on citus_finish_pg_upgrade. + */ + DROP AGGREGATE IF EXISTS array_cat_agg(anyarray); + DROP AGGREGATE IF EXISTS array_cat_agg(anycompatiblearray); + -- + -- Drop existing backup tables + -- + DROP TABLE IF EXISTS public.pg_dist_partition; + DROP TABLE IF EXISTS public.pg_dist_shard; + DROP TABLE IF EXISTS public.pg_dist_placement; + DROP TABLE IF EXISTS public.pg_dist_node_metadata; + DROP TABLE IF EXISTS public.pg_dist_node; + DROP TABLE IF EXISTS public.pg_dist_local_group; + DROP TABLE IF EXISTS public.pg_dist_transaction; + DROP TABLE IF EXISTS public.pg_dist_colocation; + DROP TABLE IF EXISTS public.pg_dist_authinfo; + DROP TABLE IF EXISTS public.pg_dist_poolinfo; + DROP TABLE IF EXISTS public.pg_dist_rebalance_strategy; + DROP TABLE IF EXISTS public.pg_dist_object; + + -- + -- backup citus catalog tables + -- + CREATE TABLE public.pg_dist_partition AS SELECT * FROM pg_catalog.pg_dist_partition; + CREATE TABLE public.pg_dist_shard AS SELECT * FROM pg_catalog.pg_dist_shard; + CREATE TABLE public.pg_dist_placement AS SELECT * FROM pg_catalog.pg_dist_placement; + CREATE TABLE public.pg_dist_node_metadata AS SELECT * FROM pg_catalog.pg_dist_node_metadata; + CREATE TABLE public.pg_dist_node AS SELECT * FROM pg_catalog.pg_dist_node; + CREATE TABLE public.pg_dist_local_group AS SELECT * FROM pg_catalog.pg_dist_local_group; + CREATE TABLE public.pg_dist_transaction AS SELECT * FROM pg_catalog.pg_dist_transaction; + CREATE TABLE public.pg_dist_colocation AS SELECT * FROM pg_catalog.pg_dist_colocation; + -- enterprise catalog tables + CREATE TABLE public.pg_dist_authinfo AS SELECT * FROM pg_catalog.pg_dist_authinfo; + CREATE TABLE public.pg_dist_poolinfo AS SELECT * FROM pg_catalog.pg_dist_poolinfo; + CREATE TABLE public.pg_dist_rebalance_strategy AS SELECT + name, + default_strategy, + shard_cost_function::regprocedure::text, + node_capacity_function::regprocedure::text, + shard_allowed_on_node_function::regprocedure::text, + default_threshold, + minimum_threshold, + improvement_threshold + FROM pg_catalog.pg_dist_rebalance_strategy; + + -- store upgrade stable identifiers on pg_dist_object catalog + CREATE TABLE public.pg_dist_object AS SELECT + address.type, + address.object_names, + address.object_args, + objects.distribution_argument_index, + objects.colocationid + FROM citus.pg_dist_object objects, + pg_catalog.pg_identify_object_as_address(objects.classid, objects.objid, objects.objsubid) address; +END; +$cppu$; + +COMMENT ON FUNCTION pg_catalog.citus_prepare_pg_upgrade() + IS 'perform tasks to copy citus settings to a location that could later be restored after pg_upgrade is done'; diff --git a/src/backend/distributed/sql/udfs/citus_prepare_pg_upgrade/latest.sql b/src/backend/distributed/sql/udfs/citus_prepare_pg_upgrade/latest.sql index fa2014870..8b5e13bc0 100644 --- a/src/backend/distributed/sql/udfs/citus_prepare_pg_upgrade/latest.sql +++ b/src/backend/distributed/sql/udfs/citus_prepare_pg_upgrade/latest.sql @@ -4,6 +4,19 @@ CREATE OR REPLACE FUNCTION pg_catalog.citus_prepare_pg_upgrade() SET search_path = pg_catalog AS $cppu$ BEGIN + + DELETE FROM pg_depend WHERE + objid IN (SELECT oid FROM pg_proc WHERE proname = 'array_cat_agg') AND + refobjid IN (select oid from pg_extension where extname = 'citus'); + /* + * We are dropping the aggregates because postgres 14 changed + * array_cat type from anyarray to anycompatiblearray. When + * upgrading to pg14, spegifically when running pg_restore on + * array_cat_agg we would get an error. So we drop the aggregate + * and create the right one on citus_finish_pg_upgrade. + */ + DROP AGGREGATE IF EXISTS array_cat_agg(anyarray); + DROP AGGREGATE IF EXISTS array_cat_agg(anycompatiblearray); -- -- Drop existing backup tables -- diff --git a/src/test/regress/bin/normalize.sed b/src/test/regress/bin/normalize.sed index 750a8e676..a0dfb1c4a 100644 --- a/src/test/regress/bin/normalize.sed +++ b/src/test/regress/bin/normalize.sed @@ -229,3 +229,4 @@ s/ERROR: parallel workers for vacuum must/ERROR: parallel vacuum degree must/g # ignore PL/pgSQL line numbers that differ on Mac builds s/(CONTEXT: PL\/pgSQL function .* line )([0-9]+)/\1XX/g +s/function array_cat_agg\(anycompatiblearray\)/function array_cat_agg\(anyarray\)/g diff --git a/src/test/regress/expected/multi_extension.out b/src/test/regress/expected/multi_extension.out index c6a5b1bd1..b115ecfc7 100644 --- a/src/test/regress/expected/multi_extension.out +++ b/src/test/regress/expected/multi_extension.out @@ -139,7 +139,7 @@ SELECT * FROM multi_extension.print_extension_changes(); | function alter_role_if_exists(text,text) boolean | function any_value(anyelement) anyelement | function any_value_agg(anyelement,anyelement) anyelement - | function array_cat_agg(anyarray) anyarray + | function array_cat_agg(anycompatiblearray) anycompatiblearray | function assign_distributed_transaction_id(integer,bigint,timestamp with time zone) void | function authinfo_valid(text) boolean | function broadcast_intermediate_result(text,text) bigint diff --git a/src/test/regress/expected/upgrade_list_citus_objects.out b/src/test/regress/expected/upgrade_list_citus_objects.out index d1212ce27..5983d6fbc 100644 --- a/src/test/regress/expected/upgrade_list_citus_objects.out +++ b/src/test/regress/expected/upgrade_list_citus_objects.out @@ -26,7 +26,7 @@ ORDER BY 1; function alter_table_set_access_method(regclass,text) function any_value(anyelement) function any_value_agg(anyelement,anyelement) - function array_cat_agg(anyarray) + function array_cat_agg(anycompatiblearray) function assign_distributed_transaction_id(integer,bigint,timestamp with time zone) function authinfo_valid(text) function broadcast_intermediate_result(text,text) diff --git a/src/test/regress/expected/upgrade_list_citus_objects_0.out b/src/test/regress/expected/upgrade_list_citus_objects_0.out index 045b538f2..fc2db7cd7 100644 --- a/src/test/regress/expected/upgrade_list_citus_objects_0.out +++ b/src/test/regress/expected/upgrade_list_citus_objects_0.out @@ -23,7 +23,7 @@ ORDER BY 1; function alter_table_set_access_method(regclass,text) function any_value(anyelement) function any_value_agg(anyelement,anyelement) - function array_cat_agg(anyarray) + function array_cat_agg(anycompatiblearray) function assign_distributed_transaction_id(integer,bigint,timestamp with time zone) function authinfo_valid(text) function broadcast_intermediate_result(text,text)