Add breaking change detection for minor version upgrades (GUC and UDF)

pull/8334/head
eaydingol 2025-11-11 15:31:09 +03:00
parent daa69bec8f
commit 064d0298d7
4 changed files with 177 additions and 0 deletions

View File

@ -320,6 +320,21 @@ check-citus-upgrade-mixed-local: all clean-upgrade-artifacts
--citus-old-version=$(citus-old-version) \
--mixed
check-citus-minor-upgrade: all
$(citus_upgrade_check) \
--bindir=$(bindir) \
--pgxsdir=$(pgxsdir) \
--citus-pre-tar=$(citus-pre-tar) \
--citus-post-tar=$(citus-post-tar) \
--minor-upgrade
check-citus-minor-upgrade-local: all clean-upgrade-artifacts
$(citus_upgrade_check) \
--bindir=$(bindir) \
--pgxsdir=$(pgxsdir) \
--citus-old-version=$(citus-old-version) \
--minor-upgrade
clean-upgrade-artifacts:
rm -rf $(citus_abs_srcdir)/tmp_citus_upgrade/ /tmp/citus_copy/

View File

@ -194,6 +194,7 @@ class CitusUpgradeConfig(CitusBaseClusterConfig):
self.new_settings = {"citus.enable_version_checks": "false"}
self.user = SUPER_USER_NAME
self.mixed_mode = arguments["--mixed"]
self.minor_upgrade = arguments.get("--minor-upgrade", False)
self.fixed_port = 57635

View File

@ -11,6 +11,7 @@ Options:
--pgxsdir=<pgxsdir> Path to the PGXS directory(ex: ~/.pgenv/src/postgresql-11.3)
--citus-old-version=<citus-old-version> Citus old version for local run(ex v8.0.0)
--mixed Run the verification phase with one node not upgraded.
--minor-upgrade Use minor version upgrade test schedules instead of major version schedules.
"""
import multiprocessing
@ -55,7 +56,14 @@ def run_citus_upgrade_tests(config, before_upgrade_schedule, after_upgrade_sched
)
report_initial_version(config)
# Store the pre-upgrade GUCs and UDFs for minor version upgrades
pre_upgrade = None
if (config.minor_upgrade):
pre_upgrade = get_citus_gucs_and_udfs(config)
run_test_on_coordinator(config, before_upgrade_schedule)
remove_citus(config.pre_tar_path)
if after_upgrade_schedule is None:
return
@ -66,9 +74,132 @@ def run_citus_upgrade_tests(config, before_upgrade_schedule, after_upgrade_sched
run_alter_citus(config.bindir, config.mixed_mode, config)
verify_upgrade(config, config.mixed_mode, config.node_name_to_ports.values())
# For minor version upgrades, verify GUCs and UDFs does not have breaking changes
breaking_changes = []
if (config.minor_upgrade):
breaking_changes = compare_citus_gucs_and_udfs(config, pre_upgrade)
run_test_on_coordinator(config, after_upgrade_schedule)
remove_citus(config.post_tar_path)
# Fail the test if there are any breaking changes
if breaking_changes:
common.eprint("\n=== BREAKING CHANGES DETECTED ===")
for change in breaking_changes:
common.eprint(f" - {change}")
common.eprint("==================================\n")
sys.exit(1)
def get_citus_gucs_and_udfs(config):
results = {}
# Store GUCs
guc_results = utils.psql_capture(
config.bindir,
config.coordinator_port(),
"SELECT name, boot_val FROM pg_settings WHERE name LIKE 'citus.%' ORDER BY name;",
)
guc_lines = guc_results.decode("utf-8").strip().split("\n")
results['gucs'] = {}
for line in guc_lines[2:]: # Skip header lines
name, boot_val = line.split("|")
results['gucs'][name.strip()] = boot_val.strip()
# Store UDFs
udf_results = utils.psql_capture(
config.bindir,
config.coordinator_port(),
"""
SELECT
n.nspname AS schema_name,
p.proname AS function_name,
pg_get_function_arguments(p.oid) AS full_args,
pg_get_function_result(p.oid) AS return_type
FROM pg_proc p
JOIN pg_namespace n ON n.oid = p.pronamespace
JOIN pg_depend d ON d.objid = p.oid
JOIN pg_extension e ON e.oid = d.refobjid
WHERE e.extname = 'citus'
AND d.deptype = 'e'
ORDER BY schema_name, function_name, full_args;
""",
)
udf_lines = udf_results.decode("utf-8").strip().split("\n")
results['udfs'] = {}
for line in udf_lines[2:]: # Skip header lines
schema_name, function_name, full_args, return_type = line.split("|")
key = (schema_name.strip(), function_name.strip())
signature = (full_args.strip(), return_type.strip())
if key not in results['udfs']:
results['udfs'][key] = set()
results['udfs'][key].add(signature)
return results
def compare_citus_gucs_and_udfs(config, pre_upgrade):
post_upgrade = get_citus_gucs_and_udfs(config)
breaking_changes = []
# Compare GUCs
for name, boot_val in pre_upgrade['gucs'].items():
if name not in post_upgrade['gucs']:
breaking_changes.append(f"GUC {name} was removed")
elif post_upgrade['gucs'][name] != boot_val and name != 'citus.version':
breaking_changes.append(f"The default value of GUC {name} was changed from {boot_val} to {post_upgrade['gucs'][name]}")
# Compare UDFs - check if any pre-upgrade signatures were removed
for (schema_name, function_name), pre_signatures in pre_upgrade['udfs'].items():
if (schema_name, function_name) not in post_upgrade['udfs']:
breaking_changes.append(f"UDF {schema_name}.{function_name} was completely removed")
else:
post_signatures = post_upgrade['udfs'][(schema_name, function_name)]
removed_signatures = pre_signatures - post_signatures
if removed_signatures:
for full_args, return_type in removed_signatures:
if not find_compatible_udf_signature(full_args, return_type, post_signatures):
breaking_changes.append(f"UDF signature removed: {schema_name}.{function_name}({full_args}) RETURNS {return_type}")
return breaking_changes
def find_compatible_udf_signature(full_args, return_type, post_signatures):
pre_args_list = [arg.strip() for arg in full_args.split(",") if arg.strip()]
for post_full_args, post_return_type in post_signatures:
if post_return_type == return_type:
post_args_list = [arg.strip() for arg in post_full_args.split(",") if arg.strip()]
""" Here check if the function signatures are compatible, they are compatible if: post_args_list has all the arguments of pre_args_list in the same order, but can have additional arguments with default values """
pre_index = 0
post_index = 0
compatible = True
while pre_index < len(pre_args_list) and post_index < len(post_args_list):
if pre_args_list[pre_index] == post_args_list[post_index]:
pre_index += 1
else:
# Check if the argument in post_args_list has a default value
if "default" not in post_args_list[post_index].lower():
compatible = False
break
post_index += 1
if pre_index < len(pre_args_list):
compatible = False
continue
while post_index < len(post_args_list):
if "default" not in post_args_list[post_index].lower():
compatible = False
break
post_index += 1
if compatible:
return True
return False
def install_citus(tar_path):
if tar_path:

View File

@ -0,0 +1,30 @@
-- Citus upgrades are finished by calling a procedure
-- this is a transactional procedure, so rollback should be fine
BEGIN;
CALL citus_finish_citus_upgrade();
NOTICE: already at the latest distributed schema version (13.2-1)
ROLLBACK;
-- do the actual job
CALL citus_finish_citus_upgrade();
NOTICE: already at the latest distributed schema version (13.2-1)
-- show that the upgrade is successfull
SELECT metadata->>'last_upgrade_version' = extversion
FROM pg_dist_node_metadata, pg_extension WHERE extname = 'citus';
?column?
---------------------------------------------------------------------
f
(1 row)
-- idempotent, should be called multiple times
-- still, do not NOTICE the version as it changes per release
SET client_min_messages TO WARNING;
CALL citus_finish_citus_upgrade();
-- we should be able to sync metadata in nontransactional way as well
SET citus.metadata_sync_mode TO 'nontransactional';
SELECT start_metadata_sync_to_all_nodes();
start_metadata_sync_to_all_nodes
---------------------------------------------------------------------
t
(1 row)
RESET citus.metadata_sync_mode;