mirror of https://github.com/citusdata/citus.git
Merge branch 'main' into propagate_doc
commit
2f382fe961
|
@ -3,3 +3,31 @@
|
||||||
# actually also works when debugging with vscode. Providing nice tools
|
# actually also works when debugging with vscode. Providing nice tools
|
||||||
# to understand the internal datastructures we are working with.
|
# to understand the internal datastructures we are working with.
|
||||||
source /root/gdbpg.py
|
source /root/gdbpg.py
|
||||||
|
|
||||||
|
# when debugging postgres it is convenient to _always_ have a breakpoint
|
||||||
|
# trigger when an error is logged. Because .gdbinit is sourced before gdb
|
||||||
|
# is fully attached and has the sources loaded. To make sure the breakpoint
|
||||||
|
# is added when the library is loaded we temporary set the breakpoint pending
|
||||||
|
# to on. After we have added out breakpoint we revert back to the default
|
||||||
|
# configuration for breakpoint pending.
|
||||||
|
# The breakpoint is hard to read, but at entry of the function we don't have
|
||||||
|
# the level loaded in elevel. Instead we hardcode the location where the
|
||||||
|
# level of the current error is stored. Also gdb doesn't understand the
|
||||||
|
# ERROR symbol so we hardcode this to the value of ERROR. It is very unlikely
|
||||||
|
# this value will ever change in postgres, but if it does we might need to
|
||||||
|
# find a way to conditionally load the correct breakpoint.
|
||||||
|
set breakpoint pending on
|
||||||
|
break elog.c:errfinish if errordata[errordata_stack_depth].elevel == 21
|
||||||
|
set breakpoint pending auto
|
||||||
|
|
||||||
|
echo \n
|
||||||
|
echo ----------------------------------------------------------------------------------\n
|
||||||
|
echo when attaching to a postgres backend a breakpoint will be set on elog.c:errfinish \n
|
||||||
|
echo it will only break on errors being raised in postgres \n
|
||||||
|
echo \n
|
||||||
|
echo to disable this breakpoint from vscode run `-exec disable 1` in the debug console \n
|
||||||
|
echo this assumes it's the first breakpoint loaded as it is loaded from .gdbinit \n
|
||||||
|
echo this can be verified with `-exec info break`, enabling can be done with \n
|
||||||
|
echo `-exec enable 1` \n
|
||||||
|
echo ----------------------------------------------------------------------------------\n
|
||||||
|
echo \n
|
||||||
|
|
|
@ -13,10 +13,33 @@ on:
|
||||||
pull_request:
|
pull_request:
|
||||||
types: [opened, reopened,synchronize]
|
types: [opened, reopened,synchronize]
|
||||||
jobs:
|
jobs:
|
||||||
|
# Since GHA does not interpolate env varibles in matrix context, we need to
|
||||||
|
# define them in a separate job and use them in other jobs.
|
||||||
|
params:
|
||||||
|
runs-on: ubuntu-latest
|
||||||
|
name: Initialize parameters
|
||||||
|
outputs:
|
||||||
|
build_image_name: "citus/extbuilder"
|
||||||
|
test_image_name: "citus/exttester"
|
||||||
|
citusupgrade_image_name: "citus/citusupgradetester"
|
||||||
|
fail_test_image_name: "citus/failtester"
|
||||||
|
pgupgrade_image_name: "citus/pgupgradetester"
|
||||||
|
style_checker_image_name: "citus/stylechecker"
|
||||||
|
style_checker_tools_version: "0.8.18"
|
||||||
|
image_suffix: "-v9d71045"
|
||||||
|
pg14_version: '{ "major": "14", "full": "14.9" }'
|
||||||
|
pg15_version: '{ "major": "15", "full": "15.4" }'
|
||||||
|
pg16_version: '{ "major": "16", "full": "16.0" }'
|
||||||
|
upgrade_pg_versions: "14.9-15.4-16.0"
|
||||||
|
steps:
|
||||||
|
# Since GHA jobs needs at least one step we use a noop step here.
|
||||||
|
- name: Set up parameters
|
||||||
|
run: echo 'noop'
|
||||||
check-sql-snapshots:
|
check-sql-snapshots:
|
||||||
|
needs: params
|
||||||
runs-on: ubuntu-20.04
|
runs-on: ubuntu-20.04
|
||||||
container:
|
container:
|
||||||
image: ${{ vars.build_image_name }}:latest
|
image: ${{ needs.params.outputs.build_image_name }}:latest
|
||||||
options: --user root
|
options: --user root
|
||||||
steps:
|
steps:
|
||||||
- uses: actions/checkout@v3.5.0
|
- uses: actions/checkout@v3.5.0
|
||||||
|
@ -25,9 +48,10 @@ jobs:
|
||||||
git config --global --add safe.directory ${GITHUB_WORKSPACE}
|
git config --global --add safe.directory ${GITHUB_WORKSPACE}
|
||||||
ci/check_sql_snapshots.sh
|
ci/check_sql_snapshots.sh
|
||||||
check-style:
|
check-style:
|
||||||
|
needs: params
|
||||||
runs-on: ubuntu-20.04
|
runs-on: ubuntu-20.04
|
||||||
container:
|
container:
|
||||||
image: ${{ vars.style_checker_image_name }}:${{ vars.style_checker_tools_version }}${{ vars.image_suffix }}
|
image: ${{ needs.params.outputs.style_checker_image_name }}:${{ needs.params.outputs.style_checker_tools_version }}${{ needs.params.outputs.image_suffix }}
|
||||||
steps:
|
steps:
|
||||||
- name: Check Snapshots
|
- name: Check Snapshots
|
||||||
run: |
|
run: |
|
||||||
|
@ -68,21 +92,22 @@ jobs:
|
||||||
- name: Check for missing downgrade scripts
|
- name: Check for missing downgrade scripts
|
||||||
run: ci/check_migration_files.sh
|
run: ci/check_migration_files.sh
|
||||||
build:
|
build:
|
||||||
name: Build for PG ${{ matrix.pg_version}}
|
needs: params
|
||||||
|
name: Build for PG${{ fromJson(matrix.pg_version).major }}
|
||||||
strategy:
|
strategy:
|
||||||
fail-fast: false
|
fail-fast: false
|
||||||
matrix:
|
matrix:
|
||||||
image_name:
|
image_name:
|
||||||
- ${{ vars.build_image_name }}
|
- ${{ needs.params.outputs.build_image_name }}
|
||||||
image_suffix:
|
image_suffix:
|
||||||
- ${{ vars.image_suffix}}
|
- ${{ needs.params.outputs.image_suffix}}
|
||||||
pg_version:
|
pg_version:
|
||||||
- ${{ vars.pg14_version }}
|
- ${{ needs.params.outputs.pg14_version }}
|
||||||
- ${{ vars.pg15_version }}
|
- ${{ needs.params.outputs.pg15_version }}
|
||||||
- ${{ vars.pg16_version }}
|
- ${{ needs.params.outputs.pg16_version }}
|
||||||
runs-on: ubuntu-20.04
|
runs-on: ubuntu-20.04
|
||||||
container:
|
container:
|
||||||
image: "${{ matrix.image_name }}:${{ matrix.pg_version }}${{ matrix.image_suffix }}"
|
image: "${{ matrix.image_name }}:${{ fromJson(matrix.pg_version).full }}${{ matrix.image_suffix }}"
|
||||||
options: --user root
|
options: --user root
|
||||||
steps:
|
steps:
|
||||||
- uses: actions/checkout@v3.5.0
|
- uses: actions/checkout@v3.5.0
|
||||||
|
@ -99,18 +124,18 @@ jobs:
|
||||||
./build-${{ env.PG_MAJOR }}/*
|
./build-${{ env.PG_MAJOR }}/*
|
||||||
./install-${{ env.PG_MAJOR }}.tar
|
./install-${{ env.PG_MAJOR }}.tar
|
||||||
test-citus:
|
test-citus:
|
||||||
name: PG${{ matrix.pg_version }} - ${{ matrix.make }}
|
name: PG${{ fromJson(matrix.pg_version).major }} - ${{ matrix.make }}
|
||||||
strategy:
|
strategy:
|
||||||
fail-fast: false
|
fail-fast: false
|
||||||
matrix:
|
matrix:
|
||||||
suite:
|
suite:
|
||||||
- regress
|
- regress
|
||||||
image_name:
|
image_name:
|
||||||
- ${{ vars.test_image_name }}
|
- ${{ needs.params.outputs.test_image_name }}
|
||||||
pg_version:
|
pg_version:
|
||||||
- ${{ vars.pg14_version }}
|
- ${{ needs.params.outputs.pg14_version }}
|
||||||
- ${{ vars.pg15_version }}
|
- ${{ needs.params.outputs.pg15_version }}
|
||||||
- ${{ vars.pg16_version }}
|
- ${{ needs.params.outputs.pg16_version }}
|
||||||
make:
|
make:
|
||||||
- check-split
|
- check-split
|
||||||
- check-multi
|
- check-multi
|
||||||
|
@ -129,69 +154,70 @@ jobs:
|
||||||
- check-enterprise-isolation-logicalrep-3
|
- check-enterprise-isolation-logicalrep-3
|
||||||
include:
|
include:
|
||||||
- make: check-failure
|
- make: check-failure
|
||||||
pg_version: ${{ vars.pg14_version }}
|
pg_version: ${{ needs.params.outputs.pg14_version }}
|
||||||
suite: regress
|
suite: regress
|
||||||
image_name: ${{ vars.fail_test_image_name }}
|
image_name: ${{ needs.params.outputs.fail_test_image_name }}
|
||||||
- make: check-failure
|
- make: check-failure
|
||||||
pg_version: ${{ vars.pg15_version }}
|
pg_version: ${{ needs.params.outputs.pg15_version }}
|
||||||
suite: regress
|
suite: regress
|
||||||
image_name: ${{ vars.fail_test_image_name }}
|
image_name: ${{ needs.params.outputs.fail_test_image_name }}
|
||||||
- make: check-failure
|
- make: check-failure
|
||||||
pg_version: ${{ vars.pg16_version }}
|
pg_version: ${{ needs.params.outputs.pg16_version }}
|
||||||
suite: regress
|
suite: regress
|
||||||
image_name: ${{ vars.fail_test_image_name }}
|
image_name: ${{ needs.params.outputs.fail_test_image_name }}
|
||||||
- make: check-enterprise-failure
|
- make: check-enterprise-failure
|
||||||
pg_version: ${{ vars.pg14_version }}
|
pg_version: ${{ needs.params.outputs.pg14_version }}
|
||||||
suite: regress
|
suite: regress
|
||||||
image_name: ${{ vars.fail_test_image_name }}
|
image_name: ${{ needs.params.outputs.fail_test_image_name }}
|
||||||
- make: check-enterprise-failure
|
- make: check-enterprise-failure
|
||||||
pg_version: ${{ vars.pg15_version }}
|
pg_version: ${{ needs.params.outputs.pg15_version }}
|
||||||
suite: regress
|
suite: regress
|
||||||
image_name: ${{ vars.fail_test_image_name }}
|
image_name: ${{ needs.params.outputs.fail_test_image_name }}
|
||||||
- make: check-enterprise-failure
|
- make: check-enterprise-failure
|
||||||
pg_version: ${{ vars.pg16_version }}
|
pg_version: ${{ needs.params.outputs.pg16_version }}
|
||||||
suite: regress
|
suite: regress
|
||||||
image_name: ${{ vars.fail_test_image_name }}
|
image_name: ${{ needs.params.outputs.fail_test_image_name }}
|
||||||
- make: check-pytest
|
- make: check-pytest
|
||||||
pg_version: ${{ vars.pg14_version }}
|
pg_version: ${{ needs.params.outputs.pg14_version }}
|
||||||
suite: regress
|
suite: regress
|
||||||
image_name: ${{ vars.fail_test_image_name }}
|
image_name: ${{ needs.params.outputs.fail_test_image_name }}
|
||||||
- make: check-pytest
|
- make: check-pytest
|
||||||
pg_version: ${{ vars.pg15_version }}
|
pg_version: ${{ needs.params.outputs.pg15_version }}
|
||||||
suite: regress
|
suite: regress
|
||||||
image_name: ${{ vars.fail_test_image_name }}
|
image_name: ${{ needs.params.outputs.fail_test_image_name }}
|
||||||
- make: check-pytest
|
- make: check-pytest
|
||||||
pg_version: ${{ vars.pg16_version }}
|
pg_version: ${{ needs.params.outputs.pg16_version }}
|
||||||
suite: regress
|
suite: regress
|
||||||
image_name: ${{ vars.fail_test_image_name }}
|
image_name: ${{ needs.params.outputs.fail_test_image_name }}
|
||||||
- make: installcheck
|
- make: installcheck
|
||||||
suite: cdc
|
suite: cdc
|
||||||
image_name: ${{ vars.test_image_name }}
|
image_name: ${{ needs.params.outputs.test_image_name }}
|
||||||
pg_version: ${{ vars.pg15_version }}
|
pg_version: ${{ needs.params.outputs.pg15_version }}
|
||||||
- make: installcheck
|
- make: installcheck
|
||||||
suite: cdc
|
suite: cdc
|
||||||
image_name: ${{ vars.test_image_name }}
|
image_name: ${{ needs.params.outputs.test_image_name }}
|
||||||
pg_version: ${{ vars.pg16_version }}
|
pg_version: ${{ needs.params.outputs.pg16_version }}
|
||||||
- make: check-query-generator
|
- make: check-query-generator
|
||||||
pg_version: ${{ vars.pg14_version }}
|
pg_version: ${{ needs.params.outputs.pg14_version }}
|
||||||
suite: regress
|
suite: regress
|
||||||
image_name: ${{ vars.fail_test_image_name }}
|
image_name: ${{ needs.params.outputs.fail_test_image_name }}
|
||||||
- make: check-query-generator
|
- make: check-query-generator
|
||||||
pg_version: ${{ vars.pg15_version }}
|
pg_version: ${{ needs.params.outputs.pg15_version }}
|
||||||
suite: regress
|
suite: regress
|
||||||
image_name: ${{ vars.fail_test_image_name }}
|
image_name: ${{ needs.params.outputs.fail_test_image_name }}
|
||||||
- make: check-query-generator
|
- make: check-query-generator
|
||||||
pg_version: ${{ vars.pg16_version }}
|
pg_version: ${{ needs.params.outputs.pg16_version }}
|
||||||
suite: regress
|
suite: regress
|
||||||
image_name: ${{ vars.fail_test_image_name }}
|
image_name: ${{ needs.params.outputs.fail_test_image_name }}
|
||||||
runs-on: ubuntu-20.04
|
runs-on: ubuntu-20.04
|
||||||
container:
|
container:
|
||||||
image: "${{ matrix.image_name }}:${{ matrix.pg_version }}${{ vars.image_suffix }}"
|
image: "${{ matrix.image_name }}:${{ fromJson(matrix.pg_version).full }}${{ needs.params.outputs.image_suffix }}"
|
||||||
options: --user root --dns=8.8.8.8
|
options: --user root --dns=8.8.8.8
|
||||||
# Due to Github creates a default network for each job, we need to use
|
# Due to Github creates a default network for each job, we need to use
|
||||||
# --dns= to have similar DNS settings as our other CI systems or local
|
# --dns= to have similar DNS settings as our other CI systems or local
|
||||||
# machines. Otherwise, we may see different results.
|
# machines. Otherwise, we may see different results.
|
||||||
needs:
|
needs:
|
||||||
|
- params
|
||||||
- build
|
- build
|
||||||
steps:
|
steps:
|
||||||
- uses: actions/checkout@v3.5.0
|
- uses: actions/checkout@v3.5.0
|
||||||
|
@ -202,29 +228,30 @@ jobs:
|
||||||
- uses: "./.github/actions/save_logs_and_results"
|
- uses: "./.github/actions/save_logs_and_results"
|
||||||
if: always()
|
if: always()
|
||||||
with:
|
with:
|
||||||
folder: ${{ matrix.pg_version }}_${{ matrix.make }}
|
folder: ${{ fromJson(matrix.pg_version).major }}_${{ matrix.make }}
|
||||||
- uses: "./.github/actions/upload_coverage"
|
- uses: "./.github/actions/upload_coverage"
|
||||||
if: always()
|
if: always()
|
||||||
with:
|
with:
|
||||||
flags: ${{ env.PG_MAJOR }}_${{ matrix.suite }}_${{ matrix.make }}
|
flags: ${{ env.PG_MAJOR }}_${{ matrix.suite }}_${{ matrix.make }}
|
||||||
codecov_token: ${{ secrets.CODECOV_TOKEN }}
|
codecov_token: ${{ secrets.CODECOV_TOKEN }}
|
||||||
test-arbitrary-configs:
|
test-arbitrary-configs:
|
||||||
name: PG${{ matrix.pg_version }} - check-arbitrary-configs-${{ matrix.parallel }}
|
name: PG${{ fromJson(matrix.pg_version).major }} - check-arbitrary-configs-${{ matrix.parallel }}
|
||||||
runs-on: ["self-hosted", "1ES.Pool=1es-gha-citusdata-pool"]
|
runs-on: ["self-hosted", "1ES.Pool=1es-gha-citusdata-pool"]
|
||||||
container:
|
container:
|
||||||
image: "${{ matrix.image_name }}:${{ matrix.pg_version }}${{ vars.image_suffix }}"
|
image: "${{ matrix.image_name }}:${{ fromJson(matrix.pg_version).full }}${{ needs.params.outputs.image_suffix }}"
|
||||||
options: --user root
|
options: --user root
|
||||||
needs:
|
needs:
|
||||||
|
- params
|
||||||
- build
|
- build
|
||||||
strategy:
|
strategy:
|
||||||
fail-fast: false
|
fail-fast: false
|
||||||
matrix:
|
matrix:
|
||||||
image_name:
|
image_name:
|
||||||
- ${{ vars.fail_test_image_name }}
|
- ${{ needs.params.outputs.fail_test_image_name }}
|
||||||
pg_version:
|
pg_version:
|
||||||
- ${{ vars.pg14_version }}
|
- ${{ needs.params.outputs.pg14_version }}
|
||||||
- ${{ vars.pg15_version }}
|
- ${{ needs.params.outputs.pg15_version }}
|
||||||
- ${{ vars.pg16_version }}
|
- ${{ needs.params.outputs.pg16_version }}
|
||||||
parallel: [0,1,2,3,4,5] # workaround for running 6 parallel jobs
|
parallel: [0,1,2,3,4,5] # workaround for running 6 parallel jobs
|
||||||
steps:
|
steps:
|
||||||
- uses: actions/checkout@v3.5.0
|
- uses: actions/checkout@v3.5.0
|
||||||
|
@ -258,9 +285,10 @@ jobs:
|
||||||
name: PG${{ matrix.old_pg_major }}-PG${{ matrix.new_pg_major }} - check-pg-upgrade
|
name: PG${{ matrix.old_pg_major }}-PG${{ matrix.new_pg_major }} - check-pg-upgrade
|
||||||
runs-on: ubuntu-20.04
|
runs-on: ubuntu-20.04
|
||||||
container:
|
container:
|
||||||
image: "${{ vars.pgupgrade_image_name }}:${{ vars.upgrade_pg_versions }}${{ vars.image_suffix }}"
|
image: "${{ needs.params.outputs.pgupgrade_image_name }}:${{ needs.params.outputs.upgrade_pg_versions }}${{ needs.params.outputs.image_suffix }}"
|
||||||
options: --user root
|
options: --user root
|
||||||
needs:
|
needs:
|
||||||
|
- params
|
||||||
- build
|
- build
|
||||||
strategy:
|
strategy:
|
||||||
fail-fast: false
|
fail-fast: false
|
||||||
|
@ -305,12 +333,13 @@ jobs:
|
||||||
flags: ${{ env.old_pg_major }}_${{ env.new_pg_major }}_upgrade
|
flags: ${{ env.old_pg_major }}_${{ env.new_pg_major }}_upgrade
|
||||||
codecov_token: ${{ secrets.CODECOV_TOKEN }}
|
codecov_token: ${{ secrets.CODECOV_TOKEN }}
|
||||||
test-citus-upgrade:
|
test-citus-upgrade:
|
||||||
name: PG${{ vars.pg14_version }} - check-citus-upgrade
|
name: PG${{ fromJson(needs.params.outputs.pg14_version).major }} - check-citus-upgrade
|
||||||
runs-on: ubuntu-20.04
|
runs-on: ubuntu-20.04
|
||||||
container:
|
container:
|
||||||
image: "${{ vars.citusupgrade_image_name }}:${{ vars.pg14_version }}${{ vars.image_suffix }}"
|
image: "${{ needs.params.outputs.citusupgrade_image_name }}:${{ fromJson(needs.params.outputs.pg14_version).full }}${{ needs.params.outputs.image_suffix }}"
|
||||||
options: --user root
|
options: --user root
|
||||||
needs:
|
needs:
|
||||||
|
- params
|
||||||
- build
|
- build
|
||||||
steps:
|
steps:
|
||||||
- uses: actions/checkout@v3.5.0
|
- uses: actions/checkout@v3.5.0
|
||||||
|
@ -354,8 +383,9 @@ jobs:
|
||||||
CC_TEST_REPORTER_ID: ${{ secrets.CC_TEST_REPORTER_ID }}
|
CC_TEST_REPORTER_ID: ${{ secrets.CC_TEST_REPORTER_ID }}
|
||||||
runs-on: ubuntu-20.04
|
runs-on: ubuntu-20.04
|
||||||
container:
|
container:
|
||||||
image: ${{ vars.test_image_name }}:${{ vars.pg16_version }}${{ vars.image_suffix }}
|
image: ${{ needs.params.outputs.test_image_name }}:${{ fromJson(needs.params.outputs.pg16_version).full }}${{ needs.params.outputs.image_suffix }}
|
||||||
needs:
|
needs:
|
||||||
|
- params
|
||||||
- test-citus
|
- test-citus
|
||||||
- test-arbitrary-configs
|
- test-arbitrary-configs
|
||||||
- test-citus-upgrade
|
- test-citus-upgrade
|
||||||
|
@ -448,11 +478,12 @@ jobs:
|
||||||
name: Test flakyness
|
name: Test flakyness
|
||||||
runs-on: ubuntu-20.04
|
runs-on: ubuntu-20.04
|
||||||
container:
|
container:
|
||||||
image: ${{ vars.fail_test_image_name }}:${{ vars.pg16_version }}${{ vars.image_suffix }}
|
image: ${{ needs.params.outputs.fail_test_image_name }}:${{ needs.params.outputs.pg16_version }}${{ needs.params.outputs.image_suffix }}
|
||||||
options: --user root
|
options: --user root
|
||||||
env:
|
env:
|
||||||
runs: 8
|
runs: 8
|
||||||
needs:
|
needs:
|
||||||
|
- params
|
||||||
- build
|
- build
|
||||||
- test-flakyness-pre
|
- test-flakyness-pre
|
||||||
- prepare_parallelization_matrix_32
|
- prepare_parallelization_matrix_32
|
||||||
|
|
|
@ -24,14 +24,16 @@ jobs:
|
||||||
- name: Get Postgres Versions
|
- name: Get Postgres Versions
|
||||||
id: get-postgres-versions
|
id: get-postgres-versions
|
||||||
run: |
|
run: |
|
||||||
# Postgres versions are stored in .circleci/config.yml file in "build-[pg-version] format. Below command
|
set -euxo pipefail
|
||||||
# extracts the versions and get the unique values.
|
# Postgres versions are stored in .github/workflows/build_and_test.yml
|
||||||
pg_versions=`grep -Eo 'build-[[:digit:]]{2}' .circleci/config.yml|sed -e "s/^build-//"|sort|uniq|tr '\n' ','| head -c -1`
|
# file in json strings with major and full keys.
|
||||||
|
# Below command extracts the versions and get the unique values.
|
||||||
|
pg_versions=$(cat .github/workflows/build_and_test.yml | grep -oE '"major": "[0-9]+", "full": "[0-9.]+"' | sed -E 's/"major": "([0-9]+)", "full": "([0-9.]+)"/\1/g' | sort | uniq | tr '\n', ',')
|
||||||
pg_versions_array="[ ${pg_versions} ]"
|
pg_versions_array="[ ${pg_versions} ]"
|
||||||
echo "Supported PG Versions: ${pg_versions_array}"
|
echo "Supported PG Versions: ${pg_versions_array}"
|
||||||
# Below line is needed to set the output variable to be used in the next job
|
# Below line is needed to set the output variable to be used in the next job
|
||||||
echo "pg_versions=${pg_versions_array}" >> $GITHUB_OUTPUT
|
echo "pg_versions=${pg_versions_array}" >> $GITHUB_OUTPUT
|
||||||
|
shell: bash
|
||||||
rpm_build_tests:
|
rpm_build_tests:
|
||||||
name: rpm_build_tests
|
name: rpm_build_tests
|
||||||
needs: get_postgres_versions_from_file
|
needs: get_postgres_versions_from_file
|
||||||
|
|
|
@ -245,6 +245,7 @@ CREATE TABLE country_codes (
|
||||||
country_code VARCHAR(3) PRIMARY KEY,
|
country_code VARCHAR(3) PRIMARY KEY,
|
||||||
country_name VARCHAR(50)
|
country_name VARCHAR(50)
|
||||||
);
|
);
|
||||||
|
SELECT create_reference_table('country_codes');
|
||||||
|
|
||||||
-- Reference Table: Order Status
|
-- Reference Table: Order Status
|
||||||
CREATE TABLE order_status (
|
CREATE TABLE order_status (
|
||||||
|
@ -269,14 +270,17 @@ The aim of this planner is to avoid relying on PostgreSQL's standard_planner() f
|
||||||
|
|
||||||
### Main C Functions Involved:
|
### Main C Functions Involved:
|
||||||
|
|
||||||
- `FastPathRouterPlan()`: The primary function for creating the fast-path query plan.
|
- `FastPathPlanner()`: The primary function for creating the fast-path query plan.
|
||||||
- `FastPathRouterQuery()`: Validates if a query is eligible for fast-path routing by checking its structure and the WHERE clause.
|
- `FastPathRouterQuery()`: Validates if a query is eligible for fast-path routing by checking its structure and the WHERE clause.
|
||||||
|
|
||||||
With set client_min_messages to debug4; you should see the following in the DEBUG messages: "DEBUG: Distributed planning for a fast-path router query"
|
With set client_min_messages to debug4; you should see the following in the DEBUG messages: "DEBUG: Distributed planning for a fast-path router query"
|
||||||
|
|
||||||
```sql
|
```sql
|
||||||
-- Fetches the count of users born in the same year, but only
|
-- Fetches the count of users born in the same year, but only
|
||||||
-- for a single country
|
-- for a single country, with a filter on the distribution column
|
||||||
|
-- Normally we have a single user with id = 15 because it's a PRIMARY KEY
|
||||||
|
-- this is just to demonstrate that fast-path can handle complex queries
|
||||||
|
-- with EXTRACT(), COUNT(), GROUP BY, HAVING, etc.
|
||||||
SELECT EXTRACT(YEAR FROM date_of_birth) as birth_year, COUNT(*)
|
SELECT EXTRACT(YEAR FROM date_of_birth) as birth_year, COUNT(*)
|
||||||
FROM users_table
|
FROM users_table
|
||||||
WHERE country_code = 'USA' AND user_id = 15
|
WHERE country_code = 'USA' AND user_id = 15
|
||||||
|
@ -382,11 +386,10 @@ FROM users_table u, orders_table o
|
||||||
WHERE u.user_id = o.user_id AND u.user_id = 42;
|
WHERE u.user_id = o.user_id AND u.user_id = 42;
|
||||||
|
|
||||||
-- With Subqueries:
|
-- With Subqueries:
|
||||||
|
|
||||||
-- Fetch the username and their total order amount
|
-- Fetch the username and their total order amount
|
||||||
-- for a specific user
|
-- for a specific user
|
||||||
SELECT u.username,
|
SELECT u.username,
|
||||||
(SELECT MAX(o.product_id) FROM orders_table o
|
(SELECT COUNT(*) FROM orders_table o
|
||||||
WHERE o.user_id = 42 AND
|
WHERE o.user_id = 42 AND
|
||||||
o.user_id = u.user_id)
|
o.user_id = u.user_id)
|
||||||
FROM users_table u
|
FROM users_table u
|
||||||
|
@ -692,7 +695,7 @@ Assume that there are two subqueries; each subquery is individually joined on th
|
||||||
-- The join condition between them is: sub1.user_id != sub2.user_id, which does not preserve distribution key equality.
|
-- The join condition between them is: sub1.user_id != sub2.user_id, which does not preserve distribution key equality.
|
||||||
-- Citus qualifies sub1 as the anchor subquery and checks whether all other subqueries are joined on the distribution key.
|
-- Citus qualifies sub1 as the anchor subquery and checks whether all other subqueries are joined on the distribution key.
|
||||||
-- In this case, sub2 is not joined on the distribution key, so Citus decides to recursively plan the whole sub2.
|
-- In this case, sub2 is not joined on the distribution key, so Citus decides to recursively plan the whole sub2.
|
||||||
SELECT a.user_id, b.user_id
|
SELECT sub1.user_id, sub2.user_id
|
||||||
FROM (
|
FROM (
|
||||||
SELECT u.user_id
|
SELECT u.user_id
|
||||||
FROM users_table u
|
FROM users_table u
|
||||||
|
@ -884,7 +887,7 @@ Citus has a rules-based optimizer. The core function `MultiLogicalPlanCreate()`
|
||||||
|
|
||||||
For instance, one simple optimization pushes the "filter" operation below the "MultiCollect." Such rules are defined in the function `Commutative()` in `multi_logical_optimizer.c`.
|
For instance, one simple optimization pushes the "filter" operation below the "MultiCollect." Such rules are defined in the function `Commutative()` in `multi_logical_optimizer.c`.
|
||||||
|
|
||||||
The most interesting part of the optimizer is usually in the final stage, when handling the more complex operators (GROUP BY, DISTINCT window functions, ORDER BY, aggregates). These operators are conjoined in a `MultiExtendedOpNode`. In many cases, they can only partially be pushed down into the worker nodes, which results in one `MultiExtendedOpNode` above the `MultiCollection` (which will run on the coordinator and aggregates across worker nodes), and another `MultiExtendedOpNode` below the `MultiCollect` (which will be pushed down to worker nodes). The bulk of the logic for generating the two nodes lives in `MasterExtendedOpNode()` and `WorkerExtendedOpNode()`, respectively.
|
The most interesting part of the optimizer is usually in the final stage, when handling the more complex operators (GROUP BY, DISTINCT window functions, ORDER BY, aggregates). These operators are conjoined in a `MultiExtendedOpNode`. In many cases, they can only partially be pushed down into the worker nodes, which results in one `MultiExtendedOpNode` above the `MultiCollect` (which will run on the coordinator and aggregates across worker nodes), and another `MultiExtendedOpNode` below the `MultiCollect` (which will be pushed down to worker nodes). The bulk of the logic for generating the two nodes lives in `MasterExtendedOpNode()` and `WorkerExtendedOpNode()`, respectively.
|
||||||
|
|
||||||
##### Aggregate functions
|
##### Aggregate functions
|
||||||
|
|
||||||
|
@ -1034,8 +1037,8 @@ SELECT * FROM cte_1;
|
||||||
-- but as the same cte used twice
|
-- but as the same cte used twice
|
||||||
-- Citus converts the CTE to intermediate result
|
-- Citus converts the CTE to intermediate result
|
||||||
WITH cte_1 AS (SELECT DISTINCT user_id FROM orders_table)
|
WITH cte_1 AS (SELECT DISTINCT user_id FROM orders_table)
|
||||||
SELECT * FROM cte_1 as c1 JOIN
|
SELECT * FROM cte_1 as c1
|
||||||
cte_1 as c2 USING (user_id);
|
JOIN cte_1 as c2 USING (user_id);
|
||||||
```
|
```
|
||||||
|
|
||||||
- **Citus Specific Materialization**:
|
- **Citus Specific Materialization**:
|
||||||
|
@ -1051,8 +1054,7 @@ As of writing this document, Citus does NOT support
|
||||||
|
|
||||||
```sql
|
```sql
|
||||||
WITH users_that_have_orders AS (SELECT users_table.* FROM users_table JOIN orders_table USING (user_id))
|
WITH users_that_have_orders AS (SELECT users_table.* FROM users_table JOIN orders_table USING (user_id))
|
||||||
SELECT
|
SELECT max(date_of_birth)
|
||||||
max(date_of_birth)
|
|
||||||
FROM users_that_have_orders
|
FROM users_that_have_orders
|
||||||
GROUP BY GROUPING SETS (user_id, email);
|
GROUP BY GROUPING SETS (user_id, email);
|
||||||
...
|
...
|
||||||
|
@ -1099,7 +1101,7 @@ INSERT INTO orders_table (order_id, user_id) VALUES
|
||||||
```
|
```
|
||||||
|
|
||||||
**Debug Info**:
|
**Debug Info**:
|
||||||
Debug information shows how the query is rebuilt for different user_ids.
|
Debug information shows how the query is rebuilt for different user_ids. Here, the shard_count is 4.
|
||||||
```sql
|
```sql
|
||||||
-- for user_id: 1
|
-- for user_id: 1
|
||||||
DEBUG: query after rebuilding: INSERT INTO public.orders_table_102041 AS citus_table_alias (order_id, user_id) VALUES ('1'::bigint,'1'::bigint), ('3'::bigint,'1'::bigint)
|
DEBUG: query after rebuilding: INSERT INTO public.orders_table_102041 AS citus_table_alias (order_id, user_id) VALUES ('1'::bigint,'1'::bigint), ('3'::bigint,'1'::bigint)
|
||||||
|
@ -1133,7 +1135,7 @@ DEBUG: query after rebuilding: INSERT INTO public.orders_table_102064 AS citus
|
||||||
**Examples**:
|
**Examples**:
|
||||||
The following section will delve into examples, starting with simple ones and moving to more complex scenarios.
|
The following section will delve into examples, starting with simple ones and moving to more complex scenarios.
|
||||||
|
|
||||||
### INSERT.. SELECT Advanced Scenarios
|
### INSERT.. SELECT Query Planning
|
||||||
|
|
||||||
**Overview**:
|
**Overview**:
|
||||||
The `INSERT .. SELECT` pushdown logic builds upon the pushdown planning for `SELECT` commands. The key requirements include colocated tables and matching distribution columns. Relevant C functions are `CreateDistributedInsertSelectPlan`, `DistributedInsertSelectSupported()`, and `AllDistributionKeysInQueryAreEqual`.
|
The `INSERT .. SELECT` pushdown logic builds upon the pushdown planning for `SELECT` commands. The key requirements include colocated tables and matching distribution columns. Relevant C functions are `CreateDistributedInsertSelectPlan`, `DistributedInsertSelectSupported()`, and `AllDistributionKeysInQueryAreEqual`.
|
||||||
|
@ -1267,7 +1269,7 @@ WHERE user_id IN (SELECT user_id FROM high_value_users);
|
||||||
Used for more complex queries, like those with subqueries or joins that can't be pushed down. The queries are planned recursively.
|
Used for more complex queries, like those with subqueries or joins that can't be pushed down. The queries are planned recursively.
|
||||||
```sql
|
```sql
|
||||||
DELETE FROM users_table WHERE user_id
|
DELETE FROM users_table WHERE user_id
|
||||||
IN (SELECT user_id FROM orders_table WHERE total > 100 ORDER BY total DESC LIMIT 5);
|
IN (SELECT user_id FROM orders_table WHERE order_date < '2023-01-01' ORDER BY order_date LIMIT 5);
|
||||||
```
|
```
|
||||||
|
|
||||||
### Correlated/Lateral Subqueries in Planning
|
### Correlated/Lateral Subqueries in Planning
|
||||||
|
@ -1279,8 +1281,7 @@ Correlated or LATERAL subqueries have special behavior in Citus. They can often
|
||||||
**Key Code Details**:
|
**Key Code Details**:
|
||||||
For more information on the code, check the following functions:
|
For more information on the code, check the following functions:
|
||||||
`DeferErrorIfCannotPushdownSubquery()` ->
|
`DeferErrorIfCannotPushdownSubquery()` ->
|
||||||
`ContainsReferencesToOuterQuery()` ->
|
`ContainsReferencesToOuterQuery()`, `DeferErrorIfSubqueryRequiresMerge()`, `DeferredErrorIfUnsupportedLateralSubquery()`. LATERAL queries are different/unique: even if the subquery requires a merge step such as a `LIMIT`, if the correlation is on the distribution column, we can push it down. See [#4385](https://github.com/citusdata/citus/pull/4385).
|
||||||
`DeferErrorIfSubqueryRequiresMerge()`.
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
@ -1409,7 +1410,7 @@ WITH recent_orders AS (
|
||||||
)
|
)
|
||||||
SELECT u.*
|
SELECT u.*
|
||||||
FROM users_table u
|
FROM users_table u
|
||||||
JOIN recent_orders o ON u.user_id = o.product_id;
|
JOIN recent_orders o ON u.user_id = o.product_id
|
||||||
JOIN orders_table o2 ON o2.product_id = o.product_id;
|
JOIN orders_table o2 ON o2.product_id = o.product_id;
|
||||||
ERROR: complex joins are only supported when all distributed tables are co-located and joined on their distribution columns
|
ERROR: complex joins are only supported when all distributed tables are co-located and joined on their distribution columns
|
||||||
```
|
```
|
||||||
|
|
|
@ -521,8 +521,7 @@ IsCitusTableTypeCacheEntry(CitusTableCacheEntry *tableEntry, CitusTableType tabl
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* HasDistributionKey returs true if given Citus table doesn't have a
|
* HasDistributionKey returns true if given Citus table has a distribution key.
|
||||||
* distribution key.
|
|
||||||
*/
|
*/
|
||||||
bool
|
bool
|
||||||
HasDistributionKey(Oid relationId)
|
HasDistributionKey(Oid relationId)
|
||||||
|
@ -538,8 +537,8 @@ HasDistributionKey(Oid relationId)
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* HasDistributionKey returs true if given cache entry identifies a Citus
|
* HasDistributionKeyCacheEntry returns true if given cache entry identifies a
|
||||||
* table that doesn't have a distribution key.
|
* Citus table that has a distribution key.
|
||||||
*/
|
*/
|
||||||
bool
|
bool
|
||||||
HasDistributionKeyCacheEntry(CitusTableCacheEntry *tableEntry)
|
HasDistributionKeyCacheEntry(CitusTableCacheEntry *tableEntry)
|
||||||
|
|
|
@ -154,7 +154,7 @@ GeneratePlaceHolderPlannedStmt(Query *parse)
|
||||||
* being a fast path router query.
|
* being a fast path router query.
|
||||||
* The requirements for the fast path query can be listed below:
|
* The requirements for the fast path query can be listed below:
|
||||||
*
|
*
|
||||||
* - SELECT query without CTES, sublinks-subqueries, set operations
|
* - SELECT/UPDATE/DELETE query without CTES, sublinks-subqueries, set operations
|
||||||
* - The query should touch only a single hash distributed or reference table
|
* - The query should touch only a single hash distributed or reference table
|
||||||
* - The distribution with equality operator should be in the WHERE clause
|
* - The distribution with equality operator should be in the WHERE clause
|
||||||
* and it should be ANDed with any other filters. Also, the distribution
|
* and it should be ANDed with any other filters. Also, the distribution
|
||||||
|
|
|
@ -2324,27 +2324,11 @@ PlanRouterQuery(Query *originalQuery,
|
||||||
TargetShardIntervalForFastPathQuery(originalQuery, &isMultiShardQuery,
|
TargetShardIntervalForFastPathQuery(originalQuery, &isMultiShardQuery,
|
||||||
distributionKeyValue,
|
distributionKeyValue,
|
||||||
partitionValueConst);
|
partitionValueConst);
|
||||||
|
Assert(!isMultiShardQuery);
|
||||||
/*
|
|
||||||
* This could only happen when there is a parameter on the distribution key.
|
|
||||||
* We defer error here, later the planner is forced to use a generic plan
|
|
||||||
* by assigning arbitrarily high cost to the plan.
|
|
||||||
*/
|
|
||||||
if (UpdateOrDeleteOrMergeQuery(originalQuery) && isMultiShardQuery)
|
|
||||||
{
|
|
||||||
planningError = DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
|
|
||||||
"Router planner cannot handle multi-shard "
|
|
||||||
"modify queries", NULL, NULL);
|
|
||||||
return planningError;
|
|
||||||
}
|
|
||||||
|
|
||||||
*prunedShardIntervalListList = shardIntervalList;
|
*prunedShardIntervalListList = shardIntervalList;
|
||||||
|
ereport(DEBUG2, (errmsg("Distributed planning for a fast-path router "
|
||||||
if (!isMultiShardQuery)
|
"query")));
|
||||||
{
|
|
||||||
ereport(DEBUG2, (errmsg("Distributed planning for a fast-path router "
|
|
||||||
"query")));
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
|
|
|
@ -481,6 +481,7 @@ _PG_init(void)
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
InitializeMaintenanceDaemon();
|
InitializeMaintenanceDaemon();
|
||||||
|
InitializeMaintenanceDaemonForMainDb();
|
||||||
|
|
||||||
/* initialize coordinated transaction management */
|
/* initialize coordinated transaction management */
|
||||||
InitializeTransactionManagement();
|
InitializeTransactionManagement();
|
||||||
|
@ -1820,6 +1821,16 @@ RegisterCitusConfigVariables(void)
|
||||||
GUC_NO_SHOW_ALL | GUC_NOT_IN_SAMPLE | GUC_UNIT_MS,
|
GUC_NO_SHOW_ALL | GUC_NOT_IN_SAMPLE | GUC_UNIT_MS,
|
||||||
NULL, NULL, NULL);
|
NULL, NULL, NULL);
|
||||||
|
|
||||||
|
DefineCustomStringVariable(
|
||||||
|
"citus.main_db",
|
||||||
|
gettext_noop("Which database is designated as the main_db"),
|
||||||
|
NULL,
|
||||||
|
&MainDb,
|
||||||
|
"",
|
||||||
|
PGC_POSTMASTER,
|
||||||
|
GUC_STANDARD,
|
||||||
|
NULL, NULL, NULL);
|
||||||
|
|
||||||
DefineCustomIntVariable(
|
DefineCustomIntVariable(
|
||||||
"citus.max_adaptive_executor_pool_size",
|
"citus.max_adaptive_executor_pool_size",
|
||||||
gettext_noop("Sets the maximum number of connections per worker node used by "
|
gettext_noop("Sets the maximum number of connections per worker node used by "
|
||||||
|
|
|
@ -99,6 +99,7 @@ int Recover2PCInterval = 60000;
|
||||||
int DeferShardDeleteInterval = 15000;
|
int DeferShardDeleteInterval = 15000;
|
||||||
int BackgroundTaskQueueCheckInterval = 5000;
|
int BackgroundTaskQueueCheckInterval = 5000;
|
||||||
int MaxBackgroundTaskExecutors = 4;
|
int MaxBackgroundTaskExecutors = 4;
|
||||||
|
char *MainDb = "";
|
||||||
|
|
||||||
/* config variables for metadata sync timeout */
|
/* config variables for metadata sync timeout */
|
||||||
int MetadataSyncInterval = 60000;
|
int MetadataSyncInterval = 60000;
|
||||||
|
@ -112,7 +113,7 @@ static MaintenanceDaemonControlData *MaintenanceDaemonControl = NULL;
|
||||||
* activated.
|
* activated.
|
||||||
*/
|
*/
|
||||||
static HTAB *MaintenanceDaemonDBHash;
|
static HTAB *MaintenanceDaemonDBHash;
|
||||||
|
static ErrorContextCallback errorCallback = { 0 };
|
||||||
static volatile sig_atomic_t got_SIGHUP = false;
|
static volatile sig_atomic_t got_SIGHUP = false;
|
||||||
static volatile sig_atomic_t got_SIGTERM = false;
|
static volatile sig_atomic_t got_SIGTERM = false;
|
||||||
|
|
||||||
|
@ -125,6 +126,8 @@ static void MaintenanceDaemonShmemExit(int code, Datum arg);
|
||||||
static void MaintenanceDaemonErrorContext(void *arg);
|
static void MaintenanceDaemonErrorContext(void *arg);
|
||||||
static bool MetadataSyncTriggeredCheckAndReset(MaintenanceDaemonDBData *dbData);
|
static bool MetadataSyncTriggeredCheckAndReset(MaintenanceDaemonDBData *dbData);
|
||||||
static void WarnMaintenanceDaemonNotStarted(void);
|
static void WarnMaintenanceDaemonNotStarted(void);
|
||||||
|
static MaintenanceDaemonDBData * GetMaintenanceDaemonDBHashEntry(Oid databaseId,
|
||||||
|
bool *found);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* InitializeMaintenanceDaemon, called at server start, is responsible for
|
* InitializeMaintenanceDaemon, called at server start, is responsible for
|
||||||
|
@ -139,6 +142,82 @@ InitializeMaintenanceDaemon(void)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* GetMaintenanceDaemonDBHashEntry searches the MaintenanceDaemonDBHash for the
|
||||||
|
* databaseId. It returns the entry if found or creates a new entry and initializes
|
||||||
|
* the value with zeroes.
|
||||||
|
*/
|
||||||
|
MaintenanceDaemonDBData *
|
||||||
|
GetMaintenanceDaemonDBHashEntry(Oid databaseId, bool *found)
|
||||||
|
{
|
||||||
|
MaintenanceDaemonDBData *dbData = (MaintenanceDaemonDBData *) hash_search(
|
||||||
|
MaintenanceDaemonDBHash,
|
||||||
|
&MyDatabaseId,
|
||||||
|
HASH_ENTER_NULL,
|
||||||
|
found);
|
||||||
|
|
||||||
|
if (!dbData)
|
||||||
|
{
|
||||||
|
elog(LOG,
|
||||||
|
"cannot create or find the maintenance deamon hash entry for database %u",
|
||||||
|
databaseId);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!*found)
|
||||||
|
{
|
||||||
|
/* ensure the values in MaintenanceDaemonDBData are zero */
|
||||||
|
memset(((char *) dbData) + sizeof(Oid), 0,
|
||||||
|
sizeof(MaintenanceDaemonDBData) - sizeof(Oid));
|
||||||
|
}
|
||||||
|
|
||||||
|
return dbData;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* InitializeMaintenanceDaemonForMainDb is called in _PG_Init
|
||||||
|
* at which stage we are not in a transaction or have databaseOid
|
||||||
|
*/
|
||||||
|
void
|
||||||
|
InitializeMaintenanceDaemonForMainDb(void)
|
||||||
|
{
|
||||||
|
if (strcmp(MainDb, "") == 0)
|
||||||
|
{
|
||||||
|
elog(LOG, "There is no designated Main database.");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
BackgroundWorker worker;
|
||||||
|
|
||||||
|
memset(&worker, 0, sizeof(worker));
|
||||||
|
|
||||||
|
|
||||||
|
strcpy_s(worker.bgw_name, sizeof(worker.bgw_name),
|
||||||
|
"Citus Maintenance Daemon for Main DB");
|
||||||
|
|
||||||
|
/* request ability to connect to target database */
|
||||||
|
worker.bgw_flags = BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION;
|
||||||
|
|
||||||
|
/*
|
||||||
|
* No point in getting started before able to run query, but we do
|
||||||
|
* want to get started on Hot-Standby.
|
||||||
|
*/
|
||||||
|
worker.bgw_start_time = BgWorkerStart_ConsistentState;
|
||||||
|
|
||||||
|
/* Restart after a bit after errors, but don't bog the system. */
|
||||||
|
worker.bgw_restart_time = 5;
|
||||||
|
strcpy_s(worker.bgw_library_name,
|
||||||
|
sizeof(worker.bgw_library_name), "citus");
|
||||||
|
strcpy_s(worker.bgw_function_name, sizeof(worker.bgw_library_name),
|
||||||
|
"CitusMaintenanceDaemonMain");
|
||||||
|
|
||||||
|
worker.bgw_main_arg = (Datum) 0;
|
||||||
|
|
||||||
|
RegisterBackgroundWorker(&worker);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* InitializeMaintenanceDaemonBackend, called at backend start and
|
* InitializeMaintenanceDaemonBackend, called at backend start and
|
||||||
* configuration changes, is responsible for starting a per-database
|
* configuration changes, is responsible for starting a per-database
|
||||||
|
@ -148,31 +227,20 @@ void
|
||||||
InitializeMaintenanceDaemonBackend(void)
|
InitializeMaintenanceDaemonBackend(void)
|
||||||
{
|
{
|
||||||
Oid extensionOwner = CitusExtensionOwner();
|
Oid extensionOwner = CitusExtensionOwner();
|
||||||
bool found;
|
bool found = false;
|
||||||
|
|
||||||
LWLockAcquire(&MaintenanceDaemonControl->lock, LW_EXCLUSIVE);
|
LWLockAcquire(&MaintenanceDaemonControl->lock, LW_EXCLUSIVE);
|
||||||
|
|
||||||
MaintenanceDaemonDBData *dbData = (MaintenanceDaemonDBData *) hash_search(
|
MaintenanceDaemonDBData *dbData = GetMaintenanceDaemonDBHashEntry(MyDatabaseId,
|
||||||
MaintenanceDaemonDBHash,
|
&found);
|
||||||
&MyDatabaseId,
|
|
||||||
HASH_ENTER_NULL,
|
|
||||||
&found);
|
|
||||||
|
|
||||||
if (dbData == NULL)
|
if (dbData == NULL)
|
||||||
{
|
{
|
||||||
WarnMaintenanceDaemonNotStarted();
|
WarnMaintenanceDaemonNotStarted();
|
||||||
LWLockRelease(&MaintenanceDaemonControl->lock);
|
LWLockRelease(&MaintenanceDaemonControl->lock);
|
||||||
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!found)
|
|
||||||
{
|
|
||||||
/* ensure the values in MaintenanceDaemonDBData are zero */
|
|
||||||
memset(((char *) dbData) + sizeof(Oid), 0,
|
|
||||||
sizeof(MaintenanceDaemonDBData) - sizeof(Oid));
|
|
||||||
}
|
|
||||||
|
|
||||||
if (IsMaintenanceDaemon)
|
if (IsMaintenanceDaemon)
|
||||||
{
|
{
|
||||||
/*
|
/*
|
||||||
|
@ -271,66 +339,97 @@ WarnMaintenanceDaemonNotStarted(void)
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* CitusMaintenanceDaemonMain is the maintenance daemon's main routine, it'll
|
* ConnectToDatabase connects to the database for the given databaseOid.
|
||||||
* be started by the background worker infrastructure. If it errors out,
|
* if databaseOid is 0, connects to MainDb and then creates a hash entry.
|
||||||
* it'll be restarted after a few seconds.
|
* If a hash entry cannot be created for MainDb it exits the process requesting a restart.
|
||||||
|
* However for regular databases, it exits without requesting a restart since another
|
||||||
|
* subsequent backend is expected to start the Maintenance Daemon.
|
||||||
|
* If the found hash entry has a valid workerPid, it exits
|
||||||
|
* without requesting a restart since there is already a daemon running.
|
||||||
*/
|
*/
|
||||||
void
|
static MaintenanceDaemonDBData *
|
||||||
CitusMaintenanceDaemonMain(Datum main_arg)
|
ConnectToDatabase(Oid databaseOid)
|
||||||
{
|
{
|
||||||
Oid databaseOid = DatumGetObjectId(main_arg);
|
MaintenanceDaemonDBData *myDbData = NULL;
|
||||||
TimestampTz nextStatsCollectionTime USED_WITH_LIBCURL_ONLY =
|
|
||||||
TimestampTzPlusMilliseconds(GetCurrentTimestamp(), 60 * 1000);
|
|
||||||
bool retryStatsCollection USED_WITH_LIBCURL_ONLY = false;
|
|
||||||
TimestampTz lastRecoveryTime = 0;
|
|
||||||
TimestampTz lastShardCleanTime = 0;
|
|
||||||
TimestampTz lastStatStatementsPurgeTime = 0;
|
|
||||||
TimestampTz nextMetadataSyncTime = 0;
|
|
||||||
|
|
||||||
/* state kept for the background tasks queue monitor */
|
|
||||||
TimestampTz lastBackgroundTaskQueueCheck = GetCurrentTimestamp();
|
|
||||||
BackgroundWorkerHandle *backgroundTasksQueueBgwHandle = NULL;
|
|
||||||
bool backgroundTasksQueueWarnedForLock = false;
|
|
||||||
|
|
||||||
/*
|
bool isMainDb = false;
|
||||||
* We do metadata sync in a separate background worker. We need its
|
|
||||||
* handle to be able to check its status.
|
|
||||||
*/
|
|
||||||
BackgroundWorkerHandle *metadataSyncBgwHandle = NULL;
|
|
||||||
|
|
||||||
/*
|
|
||||||
* Look up this worker's configuration.
|
|
||||||
*/
|
|
||||||
LWLockAcquire(&MaintenanceDaemonControl->lock, LW_EXCLUSIVE);
|
LWLockAcquire(&MaintenanceDaemonControl->lock, LW_EXCLUSIVE);
|
||||||
|
|
||||||
MaintenanceDaemonDBData *myDbData = (MaintenanceDaemonDBData *)
|
|
||||||
hash_search(MaintenanceDaemonDBHash, &databaseOid,
|
|
||||||
HASH_FIND, NULL);
|
|
||||||
if (!myDbData)
|
|
||||||
{
|
|
||||||
/*
|
|
||||||
* When the database crashes, background workers are restarted, but
|
|
||||||
* the state in shared memory is lost. In that case, we exit and
|
|
||||||
* wait for a session to call InitializeMaintenanceDaemonBackend
|
|
||||||
* to properly add it to the hash.
|
|
||||||
*/
|
|
||||||
|
|
||||||
proc_exit(0);
|
if (databaseOid == 0)
|
||||||
|
{
|
||||||
|
char *databaseName = MainDb;
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Since we cannot query databaseOid without initializing Postgres
|
||||||
|
* first, connect to the database by name.
|
||||||
|
*/
|
||||||
|
BackgroundWorkerInitializeConnection(databaseName, NULL, 0);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Now we have a valid MyDatabaseId.
|
||||||
|
* Insert the hash entry for the database to the Maintenance Deamon Hash.
|
||||||
|
*/
|
||||||
|
bool found = false;
|
||||||
|
|
||||||
|
myDbData = GetMaintenanceDaemonDBHashEntry(MyDatabaseId, &found);
|
||||||
|
|
||||||
|
if (!myDbData)
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
* If an entry cannot be created,
|
||||||
|
* return code of 1 requests worker restart
|
||||||
|
* Since BackgroundWorker for the MainDb is only registered
|
||||||
|
* once during server startup, we need to retry.
|
||||||
|
*/
|
||||||
|
proc_exit(1);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (found && myDbData->workerPid != 0)
|
||||||
|
{
|
||||||
|
/* Another maintenance daemon is running.*/
|
||||||
|
|
||||||
|
proc_exit(0);
|
||||||
|
}
|
||||||
|
|
||||||
|
databaseOid = MyDatabaseId;
|
||||||
|
myDbData->userOid = GetSessionUserId();
|
||||||
|
isMainDb = true;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
myDbData = (MaintenanceDaemonDBData *)
|
||||||
|
hash_search(MaintenanceDaemonDBHash, &databaseOid,
|
||||||
|
HASH_FIND, NULL);
|
||||||
|
|
||||||
|
if (!myDbData)
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
* When the database crashes, background workers are restarted, but
|
||||||
|
* the state in shared memory is lost. In that case, we exit and
|
||||||
|
* wait for a session to call InitializeMaintenanceDaemonBackend
|
||||||
|
* to properly add it to the hash.
|
||||||
|
*/
|
||||||
|
|
||||||
|
proc_exit(0);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (myDbData->workerPid != 0)
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
* Another maintenance daemon is running. This usually happens because
|
||||||
|
* postgres restarts the daemon after an non-zero exit, and
|
||||||
|
* InitializeMaintenanceDaemonBackend started one before postgres did.
|
||||||
|
* In that case, the first one stays and the last one exits.
|
||||||
|
*/
|
||||||
|
|
||||||
|
proc_exit(0);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (myDbData->workerPid != 0)
|
before_shmem_exit(MaintenanceDaemonShmemExit, ObjectIdGetDatum(databaseOid));
|
||||||
{
|
|
||||||
/*
|
|
||||||
* Another maintenance daemon is running. This usually happens because
|
|
||||||
* postgres restarts the daemon after an non-zero exit, and
|
|
||||||
* InitializeMaintenanceDaemonBackend started one before postgres did.
|
|
||||||
* In that case, the first one stays and the last one exits.
|
|
||||||
*/
|
|
||||||
|
|
||||||
proc_exit(0);
|
|
||||||
}
|
|
||||||
|
|
||||||
before_shmem_exit(MaintenanceDaemonShmemExit, main_arg);
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Signal that I am the maintenance daemon now.
|
* Signal that I am the maintenance daemon now.
|
||||||
|
@ -356,25 +455,55 @@ CitusMaintenanceDaemonMain(Datum main_arg)
|
||||||
|
|
||||||
LWLockRelease(&MaintenanceDaemonControl->lock);
|
LWLockRelease(&MaintenanceDaemonControl->lock);
|
||||||
|
|
||||||
/*
|
|
||||||
* Setup error context so log messages can be properly attributed. Some of
|
|
||||||
* them otherwise sound like they might be from a normal user connection.
|
|
||||||
* Do so before setting up signals etc, so we never exit without the
|
|
||||||
* context setup.
|
|
||||||
*/
|
|
||||||
ErrorContextCallback errorCallback = { 0 };
|
|
||||||
memset(&errorCallback, 0, sizeof(errorCallback));
|
memset(&errorCallback, 0, sizeof(errorCallback));
|
||||||
errorCallback.callback = MaintenanceDaemonErrorContext;
|
errorCallback.callback = MaintenanceDaemonErrorContext;
|
||||||
errorCallback.arg = (void *) myDbData;
|
errorCallback.arg = (void *) myDbData;
|
||||||
errorCallback.previous = error_context_stack;
|
errorCallback.previous = error_context_stack;
|
||||||
error_context_stack = &errorCallback;
|
error_context_stack = &errorCallback;
|
||||||
|
|
||||||
|
|
||||||
elog(LOG, "starting maintenance daemon on database %u user %u",
|
elog(LOG, "starting maintenance daemon on database %u user %u",
|
||||||
databaseOid, myDbData->userOid);
|
databaseOid, myDbData->userOid);
|
||||||
|
|
||||||
/* connect to database, after that we can actually access catalogs */
|
if (!isMainDb)
|
||||||
BackgroundWorkerInitializeConnectionByOid(databaseOid, myDbData->userOid, 0);
|
{
|
||||||
|
/* connect to database, after that we can actually access catalogs */
|
||||||
|
BackgroundWorkerInitializeConnectionByOid(databaseOid, myDbData->userOid, 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
return myDbData;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* CitusMaintenanceDaemonMain is the maintenance daemon's main routine, it'll
|
||||||
|
* be started by the background worker infrastructure. If it errors out,
|
||||||
|
* it'll be restarted after a few seconds.
|
||||||
|
*/
|
||||||
|
void
|
||||||
|
CitusMaintenanceDaemonMain(Datum main_arg)
|
||||||
|
{
|
||||||
|
Oid databaseOid = DatumGetObjectId(main_arg);
|
||||||
|
TimestampTz nextStatsCollectionTime USED_WITH_LIBCURL_ONLY =
|
||||||
|
TimestampTzPlusMilliseconds(GetCurrentTimestamp(), 60 * 1000);
|
||||||
|
bool retryStatsCollection USED_WITH_LIBCURL_ONLY = false;
|
||||||
|
TimestampTz lastRecoveryTime = 0;
|
||||||
|
TimestampTz lastShardCleanTime = 0;
|
||||||
|
TimestampTz lastStatStatementsPurgeTime = 0;
|
||||||
|
TimestampTz nextMetadataSyncTime = 0;
|
||||||
|
|
||||||
|
/* state kept for the background tasks queue monitor */
|
||||||
|
TimestampTz lastBackgroundTaskQueueCheck = GetCurrentTimestamp();
|
||||||
|
BackgroundWorkerHandle *backgroundTasksQueueBgwHandle = NULL;
|
||||||
|
bool backgroundTasksQueueWarnedForLock = false;
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* We do metadata sync in a separate background worker. We need its
|
||||||
|
* handle to be able to check its status.
|
||||||
|
*/
|
||||||
|
BackgroundWorkerHandle *metadataSyncBgwHandle = NULL;
|
||||||
|
|
||||||
|
MaintenanceDaemonDBData *myDbData = ConnectToDatabase(databaseOid);
|
||||||
|
|
||||||
/* make worker recognizable in pg_stat_activity */
|
/* make worker recognizable in pg_stat_activity */
|
||||||
pgstat_report_appname("Citus Maintenance Daemon");
|
pgstat_report_appname("Citus Maintenance Daemon");
|
||||||
|
@ -383,7 +512,7 @@ CitusMaintenanceDaemonMain(Datum main_arg)
|
||||||
* Terminate orphaned metadata sync daemons spawned from previously terminated
|
* Terminate orphaned metadata sync daemons spawned from previously terminated
|
||||||
* or crashed maintenanced instances.
|
* or crashed maintenanced instances.
|
||||||
*/
|
*/
|
||||||
SignalMetadataSyncDaemon(databaseOid, SIGTERM);
|
SignalMetadataSyncDaemon(MyDatabaseId, SIGTERM);
|
||||||
|
|
||||||
/* enter main loop */
|
/* enter main loop */
|
||||||
while (!got_SIGTERM)
|
while (!got_SIGTERM)
|
||||||
|
@ -945,7 +1074,7 @@ MaintenanceDaemonShmemExit(int code, Datum arg)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/* MaintenanceDaemonSigTermHandler calls proc_exit(0) */
|
/* MaintenanceDaemonSigTermHandler sets the got_SIGTERM flag.*/
|
||||||
static void
|
static void
|
||||||
MaintenanceDaemonSigTermHandler(SIGNAL_ARGS)
|
MaintenanceDaemonSigTermHandler(SIGNAL_ARGS)
|
||||||
{
|
{
|
||||||
|
|
|
@ -20,6 +20,7 @@
|
||||||
|
|
||||||
/* config variable for */
|
/* config variable for */
|
||||||
extern double DistributedDeadlockDetectionTimeoutFactor;
|
extern double DistributedDeadlockDetectionTimeoutFactor;
|
||||||
|
extern char *MainDb;
|
||||||
|
|
||||||
extern void StopMaintenanceDaemon(Oid databaseId);
|
extern void StopMaintenanceDaemon(Oid databaseId);
|
||||||
extern void TriggerNodeMetadataSync(Oid databaseId);
|
extern void TriggerNodeMetadataSync(Oid databaseId);
|
||||||
|
@ -27,6 +28,7 @@ extern void InitializeMaintenanceDaemon(void);
|
||||||
extern size_t MaintenanceDaemonShmemSize(void);
|
extern size_t MaintenanceDaemonShmemSize(void);
|
||||||
extern void MaintenanceDaemonShmemInit(void);
|
extern void MaintenanceDaemonShmemInit(void);
|
||||||
extern void InitializeMaintenanceDaemonBackend(void);
|
extern void InitializeMaintenanceDaemonBackend(void);
|
||||||
|
extern void InitializeMaintenanceDaemonForMainDb(void);
|
||||||
extern bool LockCitusExtension(void);
|
extern bool LockCitusExtension(void);
|
||||||
|
|
||||||
extern PGDLLEXPORT void CitusMaintenanceDaemonMain(Datum main_arg);
|
extern PGDLLEXPORT void CitusMaintenanceDaemonMain(Datum main_arg);
|
||||||
|
|
|
@ -222,7 +222,7 @@ s/(CONTEXT: PL\/pgSQL function .* line )([0-9]+)/\1XX/g
|
||||||
s/^(PL\/pgSQL function .* line) [0-9]+ (.*)/\1 XX \2/g
|
s/^(PL\/pgSQL function .* line) [0-9]+ (.*)/\1 XX \2/g
|
||||||
|
|
||||||
# normalize a test difference in multi_move_mx
|
# normalize a test difference in multi_move_mx
|
||||||
s/ connection to server at "\w+" \(127\.0\.0\.1\), port [0-9]+ failed://g
|
s/ connection to server at "\w+" (\(127\.0\.0\.1\)|\(::1\)), port [0-9]+ failed://g
|
||||||
|
|
||||||
# normalize differences in tablespace of new index
|
# normalize differences in tablespace of new index
|
||||||
s/pg14\.idx.*/pg14\.xxxxx/g
|
s/pg14\.idx.*/pg14\.xxxxx/g
|
||||||
|
|
|
@ -453,6 +453,9 @@ def cleanup_test_leftovers(nodes):
|
||||||
for node in nodes:
|
for node in nodes:
|
||||||
node.cleanup_schemas()
|
node.cleanup_schemas()
|
||||||
|
|
||||||
|
for node in nodes:
|
||||||
|
node.cleanup_databases()
|
||||||
|
|
||||||
for node in nodes:
|
for node in nodes:
|
||||||
node.cleanup_users()
|
node.cleanup_users()
|
||||||
|
|
||||||
|
@ -753,6 +756,7 @@ class Postgres(QueryRunner):
|
||||||
self.subscriptions = set()
|
self.subscriptions = set()
|
||||||
self.publications = set()
|
self.publications = set()
|
||||||
self.replication_slots = set()
|
self.replication_slots = set()
|
||||||
|
self.databases = set()
|
||||||
self.schemas = set()
|
self.schemas = set()
|
||||||
self.users = set()
|
self.users = set()
|
||||||
|
|
||||||
|
@ -993,6 +997,10 @@ class Postgres(QueryRunner):
|
||||||
args = sql.SQL("")
|
args = sql.SQL("")
|
||||||
self.sql(sql.SQL("CREATE USER {} {}").format(sql.Identifier(name), args))
|
self.sql(sql.SQL("CREATE USER {} {}").format(sql.Identifier(name), args))
|
||||||
|
|
||||||
|
def create_database(self, name):
|
||||||
|
self.databases.add(name)
|
||||||
|
self.sql(sql.SQL("CREATE DATABASE {}").format(sql.Identifier(name)))
|
||||||
|
|
||||||
def create_schema(self, name):
|
def create_schema(self, name):
|
||||||
self.schemas.add(name)
|
self.schemas.add(name)
|
||||||
self.sql(sql.SQL("CREATE SCHEMA {}").format(sql.Identifier(name)))
|
self.sql(sql.SQL("CREATE SCHEMA {}").format(sql.Identifier(name)))
|
||||||
|
@ -1020,6 +1028,12 @@ class Postgres(QueryRunner):
|
||||||
for user in self.users:
|
for user in self.users:
|
||||||
self.sql(sql.SQL("DROP USER IF EXISTS {}").format(sql.Identifier(user)))
|
self.sql(sql.SQL("DROP USER IF EXISTS {}").format(sql.Identifier(user)))
|
||||||
|
|
||||||
|
def cleanup_databases(self):
|
||||||
|
for database in self.databases:
|
||||||
|
self.sql(
|
||||||
|
sql.SQL("DROP DATABASE IF EXISTS {}").format(sql.Identifier(database))
|
||||||
|
)
|
||||||
|
|
||||||
def cleanup_schemas(self):
|
def cleanup_schemas(self):
|
||||||
for schema in self.schemas:
|
for schema in self.schemas:
|
||||||
self.sql(
|
self.sql(
|
||||||
|
|
|
@ -0,0 +1,74 @@
|
||||||
|
# This test checks that once citus.main_db is set and the
|
||||||
|
# server is restarted. A Citus Maintenance Daemon for the main_db
|
||||||
|
# is launched. This should happen even if there is no query run
|
||||||
|
# in main_db yet.
|
||||||
|
import time
|
||||||
|
|
||||||
|
|
||||||
|
def wait_until_maintenance_deamons_start(deamoncount, cluster):
|
||||||
|
i = 0
|
||||||
|
n = 0
|
||||||
|
|
||||||
|
while i < 10:
|
||||||
|
i += 1
|
||||||
|
n = cluster.coordinator.sql_value(
|
||||||
|
"SELECT count(*) FROM pg_stat_activity WHERE application_name = 'Citus Maintenance Daemon';"
|
||||||
|
)
|
||||||
|
|
||||||
|
if n == deamoncount:
|
||||||
|
break
|
||||||
|
|
||||||
|
time.sleep(0.1)
|
||||||
|
|
||||||
|
assert n == deamoncount
|
||||||
|
|
||||||
|
|
||||||
|
def test_set_maindb(cluster_factory):
|
||||||
|
cluster = cluster_factory(0)
|
||||||
|
|
||||||
|
# Test that once citus.main_db is set to a database name
|
||||||
|
# there are two maintenance deamons running upon restart.
|
||||||
|
# One maintenance deamon for the database of the current connection
|
||||||
|
# and one for the citus.main_db.
|
||||||
|
cluster.coordinator.create_database("mymaindb")
|
||||||
|
cluster.coordinator.configure("citus.main_db='mymaindb'")
|
||||||
|
cluster.coordinator.restart()
|
||||||
|
|
||||||
|
assert cluster.coordinator.sql_value("SHOW citus.main_db;") == "mymaindb"
|
||||||
|
|
||||||
|
wait_until_maintenance_deamons_start(2, cluster)
|
||||||
|
|
||||||
|
assert (
|
||||||
|
cluster.coordinator.sql_value(
|
||||||
|
"SELECT count(*) FROM pg_stat_activity WHERE application_name = 'Citus Maintenance Daemon' AND datname='mymaindb';"
|
||||||
|
)
|
||||||
|
== 1
|
||||||
|
)
|
||||||
|
|
||||||
|
# Test that once citus.main_db is set to empty string
|
||||||
|
# there is only one maintenance deamon for the database
|
||||||
|
# of the current connection.
|
||||||
|
cluster.coordinator.configure("citus.main_db=''")
|
||||||
|
cluster.coordinator.restart()
|
||||||
|
assert cluster.coordinator.sql_value("SHOW citus.main_db;") == ""
|
||||||
|
|
||||||
|
wait_until_maintenance_deamons_start(1, cluster)
|
||||||
|
|
||||||
|
# Test that after citus.main_db is dropped. The maintenance
|
||||||
|
# deamon for this database is terminated.
|
||||||
|
cluster.coordinator.configure("citus.main_db='mymaindb'")
|
||||||
|
cluster.coordinator.restart()
|
||||||
|
assert cluster.coordinator.sql_value("SHOW citus.main_db;") == "mymaindb"
|
||||||
|
|
||||||
|
wait_until_maintenance_deamons_start(2, cluster)
|
||||||
|
|
||||||
|
cluster.coordinator.sql("DROP DATABASE mymaindb;")
|
||||||
|
|
||||||
|
wait_until_maintenance_deamons_start(1, cluster)
|
||||||
|
|
||||||
|
assert (
|
||||||
|
cluster.coordinator.sql_value(
|
||||||
|
"SELECT count(*) FROM pg_stat_activity WHERE application_name = 'Citus Maintenance Daemon' AND datname='mymaindb';"
|
||||||
|
)
|
||||||
|
== 0
|
||||||
|
)
|
Loading…
Reference in New Issue