mirror of https://github.com/citusdata/citus.git
This PR introduces infrastructure and validation to detect breaking
changes during Citus minor version upgrades, designed to run in release
branches only.
**Breaking change detection:**
- [GUCs] Detects removed GUCs and changes to default values
- [UDFs] Detects removed functions and function signature changes --
Supports backward-compatible function overloading (new optional
parameters allowed)
- [types] Detects removed data types
- [tables/views] Detects removed tables/views and removed/changed
columns
- New make targets for minor version upgrade tests
- Follow-up PRs will add test schedules with different upgrade scenarios
The test will be enabled in release branches (e.g., release-13) via the
new test-citus-minor-upgrade job shown below. It will not run on the
main branch.
Testing
Verified locally with sample breaking changes:
`make check-citus-minor-upgrade-local citus-old-version=v13.2.0 `
**Test case 1:** Backward-compatible signature change (allowed)
```
-- Old: CREATE FUNCTION pg_catalog.citus_blocking_pids(pBlockedPid integer)
-- New: CREATE FUNCTION pg_catalog.citus_blocking_pids(pBlockedPid integer, pBlockedByPid integer DEFAULT NULL)
```
No breaking change detected (new parameter has DEFAULT)
**Test case 2:** Incompatible signature change (breaking)
```
-- Old: CREATE FUNCTION pg_catalog.citus_blocking_pids(pBlockedPid integer)
-- New: CREATE FUNCTION pg_catalog.citus_blocking_pids(pBlockedPid integer, pBlockedByPid integer)
```
Breaking change detected:
`UDF signature removed: pg_catalog.citus_blocking_pids(pblockedpid
integer) RETURNS integer[]`
**Test case 3:** GUC changes (breaking)
- Removed `citus.max_worker_nodes_tracked`
- Changed default value of `citus.max_shared_pool_size` from 0 to 4
Breaking change detected:
```
The default value of GUC citus.max_shared_pool_size was changed from 0 to 4
GUC citus.max_worker_nodes_tracked was removed
```
**Test case 4:** Table/view changes
- Dropped `pg_catalog.pg_dist_rebalance_strategy` and removed a column
from `pg_catalog.citus_lock_waits`
```
- Column blocking_nodeid in table/view pg_catalog.citus_lock_waits was removed
- Table/view pg_catalog.pg_dist_rebalance_strategy was removed
```
**Test case 5:** Remove a custom type
- Dropped `cluster_clock` and the objects depend on it. In addition to
the dependent objects, test shows:
```
- Type pg_catalog.cluster_clock was removed
```
Sample new job for build and test workflow (for release branches):
```
test-citus-minor-upgrade:
name: PG17 - check-citus-minor-upgrade
runs-on: ubuntu-latest
container:
image: "${{ needs.params.outputs.citusupgrade_image_name }}:${{ fromJson(needs.params.outputs.pg17_version).full }}${{ needs.params.outputs.image_suffix }}"
options: --user root
needs:
- params
- build
env:
citus_version: 13.2
steps:
- uses: actions/checkout@v4
- uses: "./.github/actions/setup_extension"
with:
skip_installation: true
- name: Install and test citus minor version upgrade
run: |-
gosu circleci \
make -C src/test/regress \
check-citus-minor-upgrade \
bindir=/usr/lib/postgresql/${PG_MAJOR}/bin \
citus-pre-tar=/install-pg${PG_MAJOR}-citus${citus_version}.tar \
citus-post-tar=${GITHUB_WORKSPACE}/install-$PG_MAJOR.tar;
- uses: "./.github/actions/save_logs_and_results"
if: always()
with:
folder: ${{ env.PG_MAJOR }}_citus_minor_upgrade
- uses: "./.github/actions/upload_coverage"
if: always()
with:
flags: ${{ env.PG_MAJOR }}_citus_minor_upgrade
codecov_token: ${{ secrets.CODECOV_TOKEN }}
```
(Cherry-picked from https://github.com/citusdata/citus/pull/8334 )
pull/8341/head
parent
9aa1384d9d
commit
6ef177c1a3
|
|
@ -320,6 +320,21 @@ check-citus-upgrade-mixed-local: all clean-upgrade-artifacts
|
||||||
--citus-old-version=$(citus-old-version) \
|
--citus-old-version=$(citus-old-version) \
|
||||||
--mixed
|
--mixed
|
||||||
|
|
||||||
|
check-citus-minor-upgrade: all
|
||||||
|
$(citus_upgrade_check) \
|
||||||
|
--bindir=$(bindir) \
|
||||||
|
--pgxsdir=$(pgxsdir) \
|
||||||
|
--citus-pre-tar=$(citus-pre-tar) \
|
||||||
|
--citus-post-tar=$(citus-post-tar) \
|
||||||
|
--minor-upgrade
|
||||||
|
|
||||||
|
check-citus-minor-upgrade-local: all clean-upgrade-artifacts
|
||||||
|
$(citus_upgrade_check) \
|
||||||
|
--bindir=$(bindir) \
|
||||||
|
--pgxsdir=$(pgxsdir) \
|
||||||
|
--citus-old-version=$(citus-old-version) \
|
||||||
|
--minor-upgrade
|
||||||
|
|
||||||
clean-upgrade-artifacts:
|
clean-upgrade-artifacts:
|
||||||
rm -rf $(citus_abs_srcdir)/tmp_citus_upgrade/ /tmp/citus_copy/
|
rm -rf $(citus_abs_srcdir)/tmp_citus_upgrade/ /tmp/citus_copy/
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -194,6 +194,7 @@ class CitusUpgradeConfig(CitusBaseClusterConfig):
|
||||||
self.new_settings = {"citus.enable_version_checks": "false"}
|
self.new_settings = {"citus.enable_version_checks": "false"}
|
||||||
self.user = SUPER_USER_NAME
|
self.user = SUPER_USER_NAME
|
||||||
self.mixed_mode = arguments["--mixed"]
|
self.mixed_mode = arguments["--mixed"]
|
||||||
|
self.minor_upgrade = arguments.get("--minor-upgrade", False)
|
||||||
self.fixed_port = 57635
|
self.fixed_port = 57635
|
||||||
|
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -11,6 +11,7 @@ Options:
|
||||||
--pgxsdir=<pgxsdir> Path to the PGXS directory(ex: ~/.pgenv/src/postgresql-11.3)
|
--pgxsdir=<pgxsdir> Path to the PGXS directory(ex: ~/.pgenv/src/postgresql-11.3)
|
||||||
--citus-old-version=<citus-old-version> Citus old version for local run(ex v8.0.0)
|
--citus-old-version=<citus-old-version> Citus old version for local run(ex v8.0.0)
|
||||||
--mixed Run the verification phase with one node not upgraded.
|
--mixed Run the verification phase with one node not upgraded.
|
||||||
|
--minor-upgrade Use minor version upgrade test schedules instead of major version schedules.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
import multiprocessing
|
import multiprocessing
|
||||||
|
|
@ -55,7 +56,14 @@ def run_citus_upgrade_tests(config, before_upgrade_schedule, after_upgrade_sched
|
||||||
)
|
)
|
||||||
|
|
||||||
report_initial_version(config)
|
report_initial_version(config)
|
||||||
|
|
||||||
|
# Store the pre-upgrade GUCs and UDFs for minor version upgrades
|
||||||
|
pre_upgrade = None
|
||||||
|
if config.minor_upgrade:
|
||||||
|
pre_upgrade = get_citus_catalog_info(config)
|
||||||
|
|
||||||
run_test_on_coordinator(config, before_upgrade_schedule)
|
run_test_on_coordinator(config, before_upgrade_schedule)
|
||||||
|
|
||||||
remove_citus(config.pre_tar_path)
|
remove_citus(config.pre_tar_path)
|
||||||
if after_upgrade_schedule is None:
|
if after_upgrade_schedule is None:
|
||||||
return
|
return
|
||||||
|
|
@ -66,9 +74,228 @@ def run_citus_upgrade_tests(config, before_upgrade_schedule, after_upgrade_sched
|
||||||
run_alter_citus(config.bindir, config.mixed_mode, config)
|
run_alter_citus(config.bindir, config.mixed_mode, config)
|
||||||
verify_upgrade(config, config.mixed_mode, config.node_name_to_ports.values())
|
verify_upgrade(config, config.mixed_mode, config.node_name_to_ports.values())
|
||||||
|
|
||||||
|
# For minor version upgrades, verify GUCs and UDFs does not have breaking changes
|
||||||
|
breaking_changes = []
|
||||||
|
if config.minor_upgrade:
|
||||||
|
breaking_changes = compare_citus_catalog_info(config, pre_upgrade)
|
||||||
|
|
||||||
run_test_on_coordinator(config, after_upgrade_schedule)
|
run_test_on_coordinator(config, after_upgrade_schedule)
|
||||||
remove_citus(config.post_tar_path)
|
remove_citus(config.post_tar_path)
|
||||||
|
|
||||||
|
# Fail the test if there are any breaking changes
|
||||||
|
if breaking_changes:
|
||||||
|
common.eprint("\n=== BREAKING CHANGES DETECTED ===")
|
||||||
|
for change in breaking_changes:
|
||||||
|
common.eprint(f" - {change}")
|
||||||
|
common.eprint("==================================\n")
|
||||||
|
sys.exit(1)
|
||||||
|
|
||||||
|
|
||||||
|
def get_citus_catalog_info(config):
|
||||||
|
|
||||||
|
results = {}
|
||||||
|
# Store GUCs
|
||||||
|
guc_results = utils.psql_capture(
|
||||||
|
config.bindir,
|
||||||
|
config.coordinator_port(),
|
||||||
|
"SELECT name, boot_val FROM pg_settings WHERE name LIKE 'citus.%' ORDER BY name;",
|
||||||
|
)
|
||||||
|
|
||||||
|
guc_lines = guc_results.decode("utf-8").strip().split("\n")
|
||||||
|
results["gucs"] = {}
|
||||||
|
for line in guc_lines[2:]: # Skip header lines
|
||||||
|
name, boot_val = line.split("|")
|
||||||
|
results["gucs"][name.strip()] = boot_val.strip()
|
||||||
|
|
||||||
|
# Store UDFs
|
||||||
|
udf_results = utils.psql_capture(
|
||||||
|
config.bindir,
|
||||||
|
config.coordinator_port(),
|
||||||
|
"""
|
||||||
|
SELECT
|
||||||
|
n.nspname AS schema_name,
|
||||||
|
p.proname AS function_name,
|
||||||
|
pg_get_function_arguments(p.oid) AS full_args,
|
||||||
|
pg_get_function_result(p.oid) AS return_type
|
||||||
|
FROM pg_proc p
|
||||||
|
JOIN pg_namespace n ON n.oid = p.pronamespace
|
||||||
|
JOIN pg_depend d ON d.objid = p.oid
|
||||||
|
JOIN pg_extension e ON e.oid = d.refobjid
|
||||||
|
WHERE e.extname = 'citus'
|
||||||
|
AND d.deptype = 'e'
|
||||||
|
ORDER BY schema_name, function_name, full_args;
|
||||||
|
""",
|
||||||
|
)
|
||||||
|
|
||||||
|
udf_lines = udf_results.decode("utf-8").strip().split("\n")
|
||||||
|
results["udfs"] = {}
|
||||||
|
for line in udf_lines[2:]: # Skip header lines
|
||||||
|
schema_name, function_name, full_args, return_type = line.split("|")
|
||||||
|
key = (schema_name.strip(), function_name.strip())
|
||||||
|
signature = (full_args.strip(), return_type.strip())
|
||||||
|
|
||||||
|
if key not in results["udfs"]:
|
||||||
|
results["udfs"][key] = set()
|
||||||
|
results["udfs"][key].add(signature)
|
||||||
|
|
||||||
|
# Store types, exclude composite types (t.typrelid = 0) and
|
||||||
|
# exclude auto-created array types
|
||||||
|
# (t.typname LIKE '\_%' AND t.typelem <> 0)
|
||||||
|
type_results = utils.psql_capture(
|
||||||
|
config.bindir,
|
||||||
|
config.coordinator_port(),
|
||||||
|
"""
|
||||||
|
SELECT n.nspname, t.typname, t.typtype
|
||||||
|
FROM pg_type t
|
||||||
|
JOIN pg_depend d ON d.objid = t.oid
|
||||||
|
JOIN pg_extension e ON e.oid = d.refobjid
|
||||||
|
JOIN pg_namespace n ON n.oid = t.typnamespace
|
||||||
|
WHERE e.extname = 'citus'
|
||||||
|
AND t.typrelid = 0
|
||||||
|
AND NOT (t.typname LIKE '\\_%%' AND t.typelem <> 0)
|
||||||
|
ORDER BY n.nspname, t.typname;
|
||||||
|
""",
|
||||||
|
)
|
||||||
|
type_lines = type_results.decode("utf-8").strip().split("\n")
|
||||||
|
results["types"] = {}
|
||||||
|
|
||||||
|
for line in type_lines[2:]: # Skip header lines
|
||||||
|
nspname, typname, typtype = line.split("|")
|
||||||
|
key = (nspname.strip(), typname.strip())
|
||||||
|
results["types"][key] = typtype.strip()
|
||||||
|
|
||||||
|
# Store tables and views
|
||||||
|
table_results = utils.psql_capture(
|
||||||
|
config.bindir,
|
||||||
|
config.coordinator_port(),
|
||||||
|
"""
|
||||||
|
SELECT n.nspname, c.relname, a.attname, t.typname
|
||||||
|
FROM pg_class c
|
||||||
|
JOIN pg_namespace n ON n.oid = c.relnamespace
|
||||||
|
JOIN pg_attribute a ON a.attrelid = c.oid
|
||||||
|
JOIN pg_type t ON t.oid = a.atttypid
|
||||||
|
JOIN pg_depend d ON d.objid = c.oid
|
||||||
|
JOIN pg_extension e ON e.oid = d.refobjid
|
||||||
|
WHERE e.extname = 'citus'
|
||||||
|
AND a.attnum > 0
|
||||||
|
AND NOT a.attisdropped
|
||||||
|
ORDER BY n.nspname, c.relname, a.attname;
|
||||||
|
""",
|
||||||
|
)
|
||||||
|
|
||||||
|
table_lines = table_results.decode("utf-8").strip().split("\n")
|
||||||
|
results["tables"] = {}
|
||||||
|
for line in table_lines[2:]: # Skip header lines
|
||||||
|
nspname, relname, attname, typname = line.split("|")
|
||||||
|
key = (nspname.strip(), relname.strip())
|
||||||
|
|
||||||
|
if key not in results["tables"]:
|
||||||
|
results["tables"][key] = {}
|
||||||
|
results["tables"][key][attname.strip()] = typname.strip()
|
||||||
|
|
||||||
|
return results
|
||||||
|
|
||||||
|
|
||||||
|
def compare_citus_catalog_info(config, pre_upgrade):
|
||||||
|
post_upgrade = get_citus_catalog_info(config)
|
||||||
|
breaking_changes = []
|
||||||
|
|
||||||
|
# Compare GUCs
|
||||||
|
for name, boot_val in pre_upgrade["gucs"].items():
|
||||||
|
if name not in post_upgrade["gucs"]:
|
||||||
|
breaking_changes.append(f"GUC {name} was removed")
|
||||||
|
elif post_upgrade["gucs"][name] != boot_val and name != "citus.version":
|
||||||
|
breaking_changes.append(
|
||||||
|
f"The default value of GUC {name} was changed from {boot_val} to {post_upgrade['gucs'][name]}"
|
||||||
|
)
|
||||||
|
|
||||||
|
# Compare UDFs - check if any pre-upgrade signatures were removed
|
||||||
|
for (schema_name, function_name), pre_signatures in pre_upgrade["udfs"].items():
|
||||||
|
if (schema_name, function_name) not in post_upgrade["udfs"]:
|
||||||
|
breaking_changes.append(
|
||||||
|
f"UDF {schema_name}.{function_name} was completely removed"
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
post_signatures = post_upgrade["udfs"][(schema_name, function_name)]
|
||||||
|
removed_signatures = pre_signatures - post_signatures
|
||||||
|
|
||||||
|
if removed_signatures:
|
||||||
|
for full_args, return_type in removed_signatures:
|
||||||
|
if not find_compatible_udf_signature(
|
||||||
|
full_args, return_type, post_signatures
|
||||||
|
):
|
||||||
|
breaking_changes.append(
|
||||||
|
f"UDF signature removed: {schema_name}.{function_name}({full_args}) RETURNS {return_type}"
|
||||||
|
)
|
||||||
|
|
||||||
|
# Compare Types - check if any pre-upgrade types were removed or changed
|
||||||
|
for (nspname, typname), typtype in pre_upgrade["types"].items():
|
||||||
|
if (nspname, typname) not in post_upgrade["types"]:
|
||||||
|
breaking_changes.append(f"Type {nspname}.{typname} was removed")
|
||||||
|
elif post_upgrade["types"][(nspname, typname)] != typtype:
|
||||||
|
breaking_changes.append(
|
||||||
|
f"Type {nspname}.{typname} changed type from {typtype} to {post_upgrade['types'][(nspname, typname)]}"
|
||||||
|
)
|
||||||
|
|
||||||
|
# Compare tables / views - check if any pre-upgrade tables or columns were removed or changed
|
||||||
|
for (nspname, relname), columns in pre_upgrade["tables"].items():
|
||||||
|
if (nspname, relname) not in post_upgrade["tables"]:
|
||||||
|
breaking_changes.append(f"Table/view {nspname}.{relname} was removed")
|
||||||
|
else:
|
||||||
|
post_columns = post_upgrade["tables"][(nspname, relname)]
|
||||||
|
|
||||||
|
for col_name, col_type in columns.items():
|
||||||
|
if col_name not in post_columns:
|
||||||
|
breaking_changes.append(
|
||||||
|
f"Column {col_name} in table/view {nspname}.{relname} was removed"
|
||||||
|
)
|
||||||
|
elif post_columns[col_name] != col_type:
|
||||||
|
breaking_changes.append(
|
||||||
|
f"Column {col_name} in table/view {nspname}.{relname} changed type from {col_type} to {post_columns[col_name]}"
|
||||||
|
)
|
||||||
|
|
||||||
|
return breaking_changes
|
||||||
|
|
||||||
|
|
||||||
|
def find_compatible_udf_signature(full_args, return_type, post_signatures):
|
||||||
|
|
||||||
|
pre_args_list = [arg.strip() for arg in full_args.split(",") if arg.strip()]
|
||||||
|
|
||||||
|
for post_full_args, post_return_type in post_signatures:
|
||||||
|
if post_return_type == return_type:
|
||||||
|
post_args_list = [
|
||||||
|
arg.strip() for arg in post_full_args.split(",") if arg.strip()
|
||||||
|
]
|
||||||
|
""" Here check if the function signatures are compatible, they are compatible if:
|
||||||
|
post_args_list has all the arguments of pre_args_list in the same order, but can have
|
||||||
|
additional arguments with default values """
|
||||||
|
pre_index = 0
|
||||||
|
post_index = 0
|
||||||
|
compatible = True
|
||||||
|
while pre_index < len(pre_args_list) and post_index < len(post_args_list):
|
||||||
|
if pre_args_list[pre_index] == post_args_list[post_index]:
|
||||||
|
pre_index += 1
|
||||||
|
else:
|
||||||
|
# Check if the argument in post_args_list has a default value
|
||||||
|
if "default" not in post_args_list[post_index].lower():
|
||||||
|
compatible = False
|
||||||
|
break
|
||||||
|
post_index += 1
|
||||||
|
if pre_index < len(pre_args_list):
|
||||||
|
compatible = False
|
||||||
|
continue
|
||||||
|
|
||||||
|
while post_index < len(post_args_list):
|
||||||
|
if "default" not in post_args_list[post_index].lower():
|
||||||
|
compatible = False
|
||||||
|
break
|
||||||
|
post_index += 1
|
||||||
|
|
||||||
|
if compatible:
|
||||||
|
return True
|
||||||
|
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
def install_citus(tar_path):
|
def install_citus(tar_path):
|
||||||
if tar_path:
|
if tar_path:
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,30 @@
|
||||||
|
-- Citus upgrades are finished by calling a procedure
|
||||||
|
-- this is a transactional procedure, so rollback should be fine
|
||||||
|
BEGIN;
|
||||||
|
CALL citus_finish_citus_upgrade();
|
||||||
|
NOTICE: already at the latest distributed schema version (13.2-1)
|
||||||
|
ROLLBACK;
|
||||||
|
-- do the actual job
|
||||||
|
CALL citus_finish_citus_upgrade();
|
||||||
|
NOTICE: already at the latest distributed schema version (13.2-1)
|
||||||
|
-- show that the upgrade is successfull
|
||||||
|
SELECT metadata->>'last_upgrade_version' = extversion
|
||||||
|
FROM pg_dist_node_metadata, pg_extension WHERE extname = 'citus';
|
||||||
|
?column?
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
f
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- idempotent, should be called multiple times
|
||||||
|
-- still, do not NOTICE the version as it changes per release
|
||||||
|
SET client_min_messages TO WARNING;
|
||||||
|
CALL citus_finish_citus_upgrade();
|
||||||
|
-- we should be able to sync metadata in nontransactional way as well
|
||||||
|
SET citus.metadata_sync_mode TO 'nontransactional';
|
||||||
|
SELECT start_metadata_sync_to_all_nodes();
|
||||||
|
start_metadata_sync_to_all_nodes
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
t
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
RESET citus.metadata_sync_mode;
|
||||||
Loading…
Reference in New Issue