From 6ef177c1a3079a6517cf84e0532184a0900908c0 Mon Sep 17 00:00:00 2001 From: eaydingol <60466783+eaydingol@users.noreply.github.com> Date: Fri, 14 Nov 2025 11:31:56 +0300 Subject: [PATCH] Add breaking change detection for minor version upgrades (#8334) (#8339) This PR introduces infrastructure and validation to detect breaking changes during Citus minor version upgrades, designed to run in release branches only. **Breaking change detection:** - [GUCs] Detects removed GUCs and changes to default values - [UDFs] Detects removed functions and function signature changes -- Supports backward-compatible function overloading (new optional parameters allowed) - [types] Detects removed data types - [tables/views] Detects removed tables/views and removed/changed columns - New make targets for minor version upgrade tests - Follow-up PRs will add test schedules with different upgrade scenarios The test will be enabled in release branches (e.g., release-13) via the new test-citus-minor-upgrade job shown below. It will not run on the main branch. Testing Verified locally with sample breaking changes: `make check-citus-minor-upgrade-local citus-old-version=v13.2.0 ` **Test case 1:** Backward-compatible signature change (allowed) ``` -- Old: CREATE FUNCTION pg_catalog.citus_blocking_pids(pBlockedPid integer) -- New: CREATE FUNCTION pg_catalog.citus_blocking_pids(pBlockedPid integer, pBlockedByPid integer DEFAULT NULL) ``` No breaking change detected (new parameter has DEFAULT) **Test case 2:** Incompatible signature change (breaking) ``` -- Old: CREATE FUNCTION pg_catalog.citus_blocking_pids(pBlockedPid integer) -- New: CREATE FUNCTION pg_catalog.citus_blocking_pids(pBlockedPid integer, pBlockedByPid integer) ``` Breaking change detected: `UDF signature removed: pg_catalog.citus_blocking_pids(pblockedpid integer) RETURNS integer[]` **Test case 3:** GUC changes (breaking) - Removed `citus.max_worker_nodes_tracked` - Changed default value of `citus.max_shared_pool_size` from 0 to 4 Breaking change detected: ``` The default value of GUC citus.max_shared_pool_size was changed from 0 to 4 GUC citus.max_worker_nodes_tracked was removed ``` **Test case 4:** Table/view changes - Dropped `pg_catalog.pg_dist_rebalance_strategy` and removed a column from `pg_catalog.citus_lock_waits` ``` - Column blocking_nodeid in table/view pg_catalog.citus_lock_waits was removed - Table/view pg_catalog.pg_dist_rebalance_strategy was removed ``` **Test case 5:** Remove a custom type - Dropped `cluster_clock` and the objects depend on it. In addition to the dependent objects, test shows: ``` - Type pg_catalog.cluster_clock was removed ``` Sample new job for build and test workflow (for release branches): ``` test-citus-minor-upgrade: name: PG17 - check-citus-minor-upgrade runs-on: ubuntu-latest container: image: "${{ needs.params.outputs.citusupgrade_image_name }}:${{ fromJson(needs.params.outputs.pg17_version).full }}${{ needs.params.outputs.image_suffix }}" options: --user root needs: - params - build env: citus_version: 13.2 steps: - uses: actions/checkout@v4 - uses: "./.github/actions/setup_extension" with: skip_installation: true - name: Install and test citus minor version upgrade run: |- gosu circleci \ make -C src/test/regress \ check-citus-minor-upgrade \ bindir=/usr/lib/postgresql/${PG_MAJOR}/bin \ citus-pre-tar=/install-pg${PG_MAJOR}-citus${citus_version}.tar \ citus-post-tar=${GITHUB_WORKSPACE}/install-$PG_MAJOR.tar; - uses: "./.github/actions/save_logs_and_results" if: always() with: folder: ${{ env.PG_MAJOR }}_citus_minor_upgrade - uses: "./.github/actions/upload_coverage" if: always() with: flags: ${{ env.PG_MAJOR }}_citus_minor_upgrade codecov_token: ${{ secrets.CODECOV_TOKEN }} ``` (Cherry-picked from https://github.com/citusdata/citus/pull/8334 ) --- src/test/regress/Makefile | 15 ++ src/test/regress/citus_tests/config.py | 1 + .../citus_tests/upgrade/citus_upgrade_test.py | 227 ++++++++++++++++++ .../upgrade_citus_finish_citus_upgrade_2.out | 30 +++ 4 files changed, 273 insertions(+) create mode 100644 src/test/regress/expected/upgrade_citus_finish_citus_upgrade_2.out diff --git a/src/test/regress/Makefile b/src/test/regress/Makefile index 4b00dc4d5..e9949ecce 100644 --- a/src/test/regress/Makefile +++ b/src/test/regress/Makefile @@ -320,6 +320,21 @@ check-citus-upgrade-mixed-local: all clean-upgrade-artifacts --citus-old-version=$(citus-old-version) \ --mixed +check-citus-minor-upgrade: all + $(citus_upgrade_check) \ + --bindir=$(bindir) \ + --pgxsdir=$(pgxsdir) \ + --citus-pre-tar=$(citus-pre-tar) \ + --citus-post-tar=$(citus-post-tar) \ + --minor-upgrade + +check-citus-minor-upgrade-local: all clean-upgrade-artifacts + $(citus_upgrade_check) \ + --bindir=$(bindir) \ + --pgxsdir=$(pgxsdir) \ + --citus-old-version=$(citus-old-version) \ + --minor-upgrade + clean-upgrade-artifacts: rm -rf $(citus_abs_srcdir)/tmp_citus_upgrade/ /tmp/citus_copy/ diff --git a/src/test/regress/citus_tests/config.py b/src/test/regress/citus_tests/config.py index 2e3375856..501960021 100644 --- a/src/test/regress/citus_tests/config.py +++ b/src/test/regress/citus_tests/config.py @@ -194,6 +194,7 @@ class CitusUpgradeConfig(CitusBaseClusterConfig): self.new_settings = {"citus.enable_version_checks": "false"} self.user = SUPER_USER_NAME self.mixed_mode = arguments["--mixed"] + self.minor_upgrade = arguments.get("--minor-upgrade", False) self.fixed_port = 57635 diff --git a/src/test/regress/citus_tests/upgrade/citus_upgrade_test.py b/src/test/regress/citus_tests/upgrade/citus_upgrade_test.py index 1ab448031..b093b76c7 100755 --- a/src/test/regress/citus_tests/upgrade/citus_upgrade_test.py +++ b/src/test/regress/citus_tests/upgrade/citus_upgrade_test.py @@ -11,6 +11,7 @@ Options: --pgxsdir= Path to the PGXS directory(ex: ~/.pgenv/src/postgresql-11.3) --citus-old-version= Citus old version for local run(ex v8.0.0) --mixed Run the verification phase with one node not upgraded. + --minor-upgrade Use minor version upgrade test schedules instead of major version schedules. """ import multiprocessing @@ -55,7 +56,14 @@ def run_citus_upgrade_tests(config, before_upgrade_schedule, after_upgrade_sched ) report_initial_version(config) + + # Store the pre-upgrade GUCs and UDFs for minor version upgrades + pre_upgrade = None + if config.minor_upgrade: + pre_upgrade = get_citus_catalog_info(config) + run_test_on_coordinator(config, before_upgrade_schedule) + remove_citus(config.pre_tar_path) if after_upgrade_schedule is None: return @@ -66,9 +74,228 @@ def run_citus_upgrade_tests(config, before_upgrade_schedule, after_upgrade_sched run_alter_citus(config.bindir, config.mixed_mode, config) verify_upgrade(config, config.mixed_mode, config.node_name_to_ports.values()) + # For minor version upgrades, verify GUCs and UDFs does not have breaking changes + breaking_changes = [] + if config.minor_upgrade: + breaking_changes = compare_citus_catalog_info(config, pre_upgrade) + run_test_on_coordinator(config, after_upgrade_schedule) remove_citus(config.post_tar_path) + # Fail the test if there are any breaking changes + if breaking_changes: + common.eprint("\n=== BREAKING CHANGES DETECTED ===") + for change in breaking_changes: + common.eprint(f" - {change}") + common.eprint("==================================\n") + sys.exit(1) + + +def get_citus_catalog_info(config): + + results = {} + # Store GUCs + guc_results = utils.psql_capture( + config.bindir, + config.coordinator_port(), + "SELECT name, boot_val FROM pg_settings WHERE name LIKE 'citus.%' ORDER BY name;", + ) + + guc_lines = guc_results.decode("utf-8").strip().split("\n") + results["gucs"] = {} + for line in guc_lines[2:]: # Skip header lines + name, boot_val = line.split("|") + results["gucs"][name.strip()] = boot_val.strip() + + # Store UDFs + udf_results = utils.psql_capture( + config.bindir, + config.coordinator_port(), + """ + SELECT + n.nspname AS schema_name, + p.proname AS function_name, + pg_get_function_arguments(p.oid) AS full_args, + pg_get_function_result(p.oid) AS return_type + FROM pg_proc p + JOIN pg_namespace n ON n.oid = p.pronamespace + JOIN pg_depend d ON d.objid = p.oid + JOIN pg_extension e ON e.oid = d.refobjid + WHERE e.extname = 'citus' + AND d.deptype = 'e' + ORDER BY schema_name, function_name, full_args; + """, + ) + + udf_lines = udf_results.decode("utf-8").strip().split("\n") + results["udfs"] = {} + for line in udf_lines[2:]: # Skip header lines + schema_name, function_name, full_args, return_type = line.split("|") + key = (schema_name.strip(), function_name.strip()) + signature = (full_args.strip(), return_type.strip()) + + if key not in results["udfs"]: + results["udfs"][key] = set() + results["udfs"][key].add(signature) + + # Store types, exclude composite types (t.typrelid = 0) and + # exclude auto-created array types + # (t.typname LIKE '\_%' AND t.typelem <> 0) + type_results = utils.psql_capture( + config.bindir, + config.coordinator_port(), + """ + SELECT n.nspname, t.typname, t.typtype + FROM pg_type t + JOIN pg_depend d ON d.objid = t.oid + JOIN pg_extension e ON e.oid = d.refobjid + JOIN pg_namespace n ON n.oid = t.typnamespace + WHERE e.extname = 'citus' + AND t.typrelid = 0 + AND NOT (t.typname LIKE '\\_%%' AND t.typelem <> 0) + ORDER BY n.nspname, t.typname; + """, + ) + type_lines = type_results.decode("utf-8").strip().split("\n") + results["types"] = {} + + for line in type_lines[2:]: # Skip header lines + nspname, typname, typtype = line.split("|") + key = (nspname.strip(), typname.strip()) + results["types"][key] = typtype.strip() + + # Store tables and views + table_results = utils.psql_capture( + config.bindir, + config.coordinator_port(), + """ + SELECT n.nspname, c.relname, a.attname, t.typname + FROM pg_class c + JOIN pg_namespace n ON n.oid = c.relnamespace + JOIN pg_attribute a ON a.attrelid = c.oid + JOIN pg_type t ON t.oid = a.atttypid + JOIN pg_depend d ON d.objid = c.oid + JOIN pg_extension e ON e.oid = d.refobjid + WHERE e.extname = 'citus' + AND a.attnum > 0 + AND NOT a.attisdropped + ORDER BY n.nspname, c.relname, a.attname; + """, + ) + + table_lines = table_results.decode("utf-8").strip().split("\n") + results["tables"] = {} + for line in table_lines[2:]: # Skip header lines + nspname, relname, attname, typname = line.split("|") + key = (nspname.strip(), relname.strip()) + + if key not in results["tables"]: + results["tables"][key] = {} + results["tables"][key][attname.strip()] = typname.strip() + + return results + + +def compare_citus_catalog_info(config, pre_upgrade): + post_upgrade = get_citus_catalog_info(config) + breaking_changes = [] + + # Compare GUCs + for name, boot_val in pre_upgrade["gucs"].items(): + if name not in post_upgrade["gucs"]: + breaking_changes.append(f"GUC {name} was removed") + elif post_upgrade["gucs"][name] != boot_val and name != "citus.version": + breaking_changes.append( + f"The default value of GUC {name} was changed from {boot_val} to {post_upgrade['gucs'][name]}" + ) + + # Compare UDFs - check if any pre-upgrade signatures were removed + for (schema_name, function_name), pre_signatures in pre_upgrade["udfs"].items(): + if (schema_name, function_name) not in post_upgrade["udfs"]: + breaking_changes.append( + f"UDF {schema_name}.{function_name} was completely removed" + ) + else: + post_signatures = post_upgrade["udfs"][(schema_name, function_name)] + removed_signatures = pre_signatures - post_signatures + + if removed_signatures: + for full_args, return_type in removed_signatures: + if not find_compatible_udf_signature( + full_args, return_type, post_signatures + ): + breaking_changes.append( + f"UDF signature removed: {schema_name}.{function_name}({full_args}) RETURNS {return_type}" + ) + + # Compare Types - check if any pre-upgrade types were removed or changed + for (nspname, typname), typtype in pre_upgrade["types"].items(): + if (nspname, typname) not in post_upgrade["types"]: + breaking_changes.append(f"Type {nspname}.{typname} was removed") + elif post_upgrade["types"][(nspname, typname)] != typtype: + breaking_changes.append( + f"Type {nspname}.{typname} changed type from {typtype} to {post_upgrade['types'][(nspname, typname)]}" + ) + + # Compare tables / views - check if any pre-upgrade tables or columns were removed or changed + for (nspname, relname), columns in pre_upgrade["tables"].items(): + if (nspname, relname) not in post_upgrade["tables"]: + breaking_changes.append(f"Table/view {nspname}.{relname} was removed") + else: + post_columns = post_upgrade["tables"][(nspname, relname)] + + for col_name, col_type in columns.items(): + if col_name not in post_columns: + breaking_changes.append( + f"Column {col_name} in table/view {nspname}.{relname} was removed" + ) + elif post_columns[col_name] != col_type: + breaking_changes.append( + f"Column {col_name} in table/view {nspname}.{relname} changed type from {col_type} to {post_columns[col_name]}" + ) + + return breaking_changes + + +def find_compatible_udf_signature(full_args, return_type, post_signatures): + + pre_args_list = [arg.strip() for arg in full_args.split(",") if arg.strip()] + + for post_full_args, post_return_type in post_signatures: + if post_return_type == return_type: + post_args_list = [ + arg.strip() for arg in post_full_args.split(",") if arg.strip() + ] + """ Here check if the function signatures are compatible, they are compatible if: + post_args_list has all the arguments of pre_args_list in the same order, but can have + additional arguments with default values """ + pre_index = 0 + post_index = 0 + compatible = True + while pre_index < len(pre_args_list) and post_index < len(post_args_list): + if pre_args_list[pre_index] == post_args_list[post_index]: + pre_index += 1 + else: + # Check if the argument in post_args_list has a default value + if "default" not in post_args_list[post_index].lower(): + compatible = False + break + post_index += 1 + if pre_index < len(pre_args_list): + compatible = False + continue + + while post_index < len(post_args_list): + if "default" not in post_args_list[post_index].lower(): + compatible = False + break + post_index += 1 + + if compatible: + return True + + return False + def install_citus(tar_path): if tar_path: diff --git a/src/test/regress/expected/upgrade_citus_finish_citus_upgrade_2.out b/src/test/regress/expected/upgrade_citus_finish_citus_upgrade_2.out new file mode 100644 index 000000000..50233c445 --- /dev/null +++ b/src/test/regress/expected/upgrade_citus_finish_citus_upgrade_2.out @@ -0,0 +1,30 @@ +-- Citus upgrades are finished by calling a procedure +-- this is a transactional procedure, so rollback should be fine +BEGIN; + CALL citus_finish_citus_upgrade(); +NOTICE: already at the latest distributed schema version (13.2-1) +ROLLBACK; +-- do the actual job +CALL citus_finish_citus_upgrade(); +NOTICE: already at the latest distributed schema version (13.2-1) +-- show that the upgrade is successfull +SELECT metadata->>'last_upgrade_version' = extversion +FROM pg_dist_node_metadata, pg_extension WHERE extname = 'citus'; + ?column? +--------------------------------------------------------------------- + f +(1 row) + +-- idempotent, should be called multiple times +-- still, do not NOTICE the version as it changes per release +SET client_min_messages TO WARNING; +CALL citus_finish_citus_upgrade(); +-- we should be able to sync metadata in nontransactional way as well +SET citus.metadata_sync_mode TO 'nontransactional'; +SELECT start_metadata_sync_to_all_nodes(); + start_metadata_sync_to_all_nodes +--------------------------------------------------------------------- + t +(1 row) + +RESET citus.metadata_sync_mode;