Merge branch 'create_alter_database' into alter_database_additional_options

pull/7253/head
Gürkan İndibay 2023-10-14 08:15:45 +03:00 committed by GitHub
commit c38e856f13
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
25 changed files with 871 additions and 160 deletions

View File

@ -698,7 +698,6 @@ jobs:
workflows: workflows:
version: 2 version: 2
flaky_test_debugging: flaky_test_debugging:
when: << pipeline.parameters.flaky_test >>
jobs: jobs:
- build: - build:
name: build-flaky-15 name: build-flaky-15
@ -714,8 +713,6 @@ workflows:
runs: << pipeline.parameters.flaky_test_runs_per_job >> runs: << pipeline.parameters.flaky_test_runs_per_job >>
build_and_test: build_and_test:
when:
not: << pipeline.parameters.flaky_test >>
jobs: jobs:
- build: - build:
name: build-14 name: build-14

View File

@ -36,6 +36,10 @@ RUN apt update && apt install -y \
&& add-apt-repository ppa:deadsnakes/ppa -y \ && add-apt-repository ppa:deadsnakes/ppa -y \
&& apt install -y \ && apt install -y \
python3.9-full \ python3.9-full \
# software properties pulls in pkexec, which makes the debugger unusable in vscode
&& apt purge -y \
software-properties-common \
&& apt autoremove -y \
&& apt clean && apt clean
RUN sudo pip3 install pipenv pipenv-shebang RUN sudo pip3 install pipenv pipenv-shebang
@ -109,7 +113,7 @@ WORKDIR /uncrustify/uncrustify-uncrustify-0.68.1/
RUN mkdir build RUN mkdir build
WORKDIR /uncrustify/uncrustify-uncrustify-0.68.1/build/ WORKDIR /uncrustify/uncrustify-uncrustify-0.68.1/build/
RUN cmake .. RUN cmake ..
RUN make -sj8 RUN MAKEFLAGS="-j $(nproc)" make -s
RUN make install DESTDIR=/uncrustify RUN make install DESTDIR=/uncrustify

View File

@ -0,0 +1,23 @@
name: 'Parallelization matrix'
inputs:
count:
required: false
default: 32
outputs:
json:
value: ${{ steps.generate_matrix.outputs.json }}
runs:
using: "composite"
steps:
- name: Generate parallelization matrix
id: generate_matrix
shell: bash
run: |-
json_array="{\"include\": ["
for ((i = 1; i <= ${{ inputs.count }}; i++)); do
json_array+="{\"id\":\"$i\"},"
done
json_array=${json_array%,}
json_array+=" ]}"
echo "json=$json_array" >> "$GITHUB_OUTPUT"
echo "json=$json_array"

View File

@ -0,0 +1,38 @@
name: save_logs_and_results
inputs:
folder:
required: false
default: "log"
runs:
using: composite
steps:
- uses: actions/upload-artifact@v3.1.1
name: Upload logs
with:
name: ${{ inputs.folder }}
if-no-files-found: ignore
path: |
src/test/**/proxy.output
src/test/**/results/
src/test/**/tmp_check/master/log
src/test/**/tmp_check/worker.57638/log
src/test/**/tmp_check/worker.57637/log
src/test/**/*.diffs
src/test/**/out/ddls.sql
src/test/**/out/queries.sql
src/test/**/logfile_*
/tmp/pg_upgrade_newData_logs
- name: Publish regression.diffs
run: |-
diffs="$(find src/test/regress -name "*.diffs" -exec cat {} \;)"
if ! [ -z "$diffs" ]; then
echo '```diff' >> $GITHUB_STEP_SUMMARY
echo -E "$diffs" >> $GITHUB_STEP_SUMMARY
echo '```' >> $GITHUB_STEP_SUMMARY
echo -E $diffs
fi
shell: bash
- name: Print stack traces
run: "./ci/print_stack_trace.sh"
if: failure()
shell: bash

View File

@ -0,0 +1,35 @@
name: setup_extension
inputs:
pg_major:
required: false
skip_installation:
required: false
default: false
type: boolean
runs:
using: composite
steps:
- name: Expose $PG_MAJOR to Github Env
run: |-
if [ -z "${{ inputs.pg_major }}" ]; then
echo "PG_MAJOR=${PG_MAJOR}" >> $GITHUB_ENV
else
echo "PG_MAJOR=${{ inputs.pg_major }}" >> $GITHUB_ENV
fi
shell: bash
- uses: actions/download-artifact@v3.0.1
with:
name: build-${{ env.PG_MAJOR }}
- name: Install Extension
if: ${{ inputs.skip_installation == 'false' }}
run: tar xfv "install-$PG_MAJOR.tar" --directory /
shell: bash
- name: Configure
run: |-
chown -R circleci .
git config --global --add safe.directory ${GITHUB_WORKSPACE}
gosu circleci ./configure --without-pg-version-check
shell: bash
- name: Enable core dumps
run: ulimit -c unlimited
shell: bash

View File

@ -0,0 +1,27 @@
name: coverage
inputs:
flags:
required: false
codecov_token:
required: true
runs:
using: composite
steps:
- uses: codecov/codecov-action@v3
with:
flags: ${{ inputs.flags }}
token: ${{ inputs.codecov_token }}
verbose: true
gcov: true
- name: Create codeclimate coverage
run: |-
lcov --directory . --capture --output-file lcov.info
lcov --remove lcov.info -o lcov.info '/usr/*'
sed "s=^SF:$PWD/=SF:=g" -i lcov.info # relative pats are required by codeclimate
mkdir -p /tmp/codeclimate
cc-test-reporter format-coverage -t lcov -o /tmp/codeclimate/${{ inputs.flags }}.json lcov.info
shell: bash
- uses: actions/upload-artifact@v3.1.1
with:
path: "/tmp/codeclimate/*.json"
name: codeclimate

474
.github/workflows/build_and_test.yml vendored Normal file
View File

@ -0,0 +1,474 @@
name: Build & Test
run-name: Build & Test - ${{ github.event.pull_request.title || github.ref_name }}
concurrency:
group: ${{ github.workflow }}-${{ github.event.pull_request.number || github.ref }}
cancel-in-progress: true
on:
workflow_dispatch:
inputs:
skip_test_flakyness:
required: false
default: false
type: boolean
pull_request:
types: [opened, reopened,synchronize]
jobs:
check-sql-snapshots:
runs-on: ubuntu-20.04
container:
image: ${{ vars.build_image_name }}:latest
options: --user root
steps:
- uses: actions/checkout@v3.5.0
- name: Check Snapshots
run: |
git config --global --add safe.directory ${GITHUB_WORKSPACE}
ci/check_sql_snapshots.sh
check-style:
runs-on: ubuntu-20.04
container:
image: ${{ vars.style_checker_image_name }}:${{ vars.style_checker_tools_version }}${{ vars.image_suffix }}
steps:
- name: Check Snapshots
run: |
git config --global --add safe.directory ${GITHUB_WORKSPACE}
- uses: actions/checkout@v3.5.0
with:
fetch-depth: 0
- name: Check C Style
run: citus_indent --check
- name: Check Python style
run: black --check .
- name: Check Python import order
run: isort --check .
- name: Check Python lints
run: flake8 .
- name: Fix whitespace
run: ci/editorconfig.sh && git diff --exit-code
- name: Remove useless declarations
run: ci/remove_useless_declarations.sh && git diff --cached --exit-code
- name: Normalize test output
run: ci/normalize_expected.sh && git diff --exit-code
- name: Check for C-style comments in migration files
run: ci/disallow_c_comments_in_migrations.sh && git diff --exit-code
- name: 'Check for comment--cached ns that start with # character in spec files'
run: ci/disallow_hash_comments_in_spec_files.sh && git diff --exit-code
- name: Check for gitignore entries .for source files
run: ci/fix_gitignore.sh && git diff --exit-code
- name: Check for lengths of changelog entries
run: ci/disallow_long_changelog_entries.sh
- name: Check for banned C API usage
run: ci/banned.h.sh
- name: Check for tests missing in schedules
run: ci/check_all_tests_are_run.sh
- name: Check if all CI scripts are actually run
run: ci/check_all_ci_scripts_are_run.sh
- name: Check if all GUCs are sorted alphabetically
run: ci/check_gucs_are_alphabetically_sorted.sh
- name: Check for missing downgrade scripts
run: ci/check_migration_files.sh
build:
name: Build for PG ${{ matrix.pg_version}}
strategy:
fail-fast: false
matrix:
image_name:
- ${{ vars.build_image_name }}
image_suffix:
- ${{ vars.image_suffix}}
pg_version:
- ${{ vars.pg14_version }}
- ${{ vars.pg15_version }}
- ${{ vars.pg16_version }}
runs-on: ubuntu-20.04
container:
image: "${{ matrix.image_name }}:${{ matrix.pg_version }}${{ matrix.image_suffix }}"
options: --user root
steps:
- uses: actions/checkout@v3.5.0
- name: Expose $PG_MAJOR to Github Env
run: echo "PG_MAJOR=${PG_MAJOR}" >> $GITHUB_ENV
shell: bash
- name: Build
run: "./ci/build-citus.sh"
shell: bash
- uses: actions/upload-artifact@v3.1.1
with:
name: build-${{ env.PG_MAJOR }}
path: |-
./build-${{ env.PG_MAJOR }}/*
./install-${{ env.PG_MAJOR }}.tar
test-citus:
name: PG${{ matrix.pg_version }} - ${{ matrix.make }}
strategy:
fail-fast: false
matrix:
suite:
- regress
image_name:
- ${{ vars.test_image_name }}
pg_version:
- ${{ vars.pg14_version }}
- ${{ vars.pg15_version }}
- ${{ vars.pg16_version }}
make:
- check-split
- check-multi
- check-multi-1
- check-multi-mx
- check-vanilla
- check-isolation
- check-operations
- check-follower-cluster
- check-columnar
- check-columnar-isolation
- check-enterprise
- check-enterprise-isolation
- check-enterprise-isolation-logicalrep-1
- check-enterprise-isolation-logicalrep-2
- check-enterprise-isolation-logicalrep-3
include:
- make: check-failure
pg_version: ${{ vars.pg14_version }}
suite: regress
image_name: ${{ vars.fail_test_image_name }}
- make: check-failure
pg_version: ${{ vars.pg15_version }}
suite: regress
image_name: ${{ vars.fail_test_image_name }}
- make: check-failure
pg_version: ${{ vars.pg16_version }}
suite: regress
image_name: ${{ vars.fail_test_image_name }}
- make: check-enterprise-failure
pg_version: ${{ vars.pg14_version }}
suite: regress
image_name: ${{ vars.fail_test_image_name }}
- make: check-enterprise-failure
pg_version: ${{ vars.pg15_version }}
suite: regress
image_name: ${{ vars.fail_test_image_name }}
- make: check-enterprise-failure
pg_version: ${{ vars.pg16_version }}
suite: regress
image_name: ${{ vars.fail_test_image_name }}
- make: check-pytest
pg_version: ${{ vars.pg14_version }}
suite: regress
image_name: ${{ vars.fail_test_image_name }}
- make: check-pytest
pg_version: ${{ vars.pg15_version }}
suite: regress
image_name: ${{ vars.fail_test_image_name }}
- make: check-pytest
pg_version: ${{ vars.pg16_version }}
suite: regress
image_name: ${{ vars.fail_test_image_name }}
- make: installcheck
suite: cdc
image_name: ${{ vars.test_image_name }}
pg_version: ${{ vars.pg15_version }}
- make: installcheck
suite: cdc
image_name: ${{ vars.test_image_name }}
pg_version: ${{ vars.pg16_version }}
- make: check-query-generator
pg_version: ${{ vars.pg14_version }}
suite: regress
image_name: ${{ vars.fail_test_image_name }}
- make: check-query-generator
pg_version: ${{ vars.pg15_version }}
suite: regress
image_name: ${{ vars.fail_test_image_name }}
- make: check-query-generator
pg_version: ${{ vars.pg16_version }}
suite: regress
image_name: ${{ vars.fail_test_image_name }}
runs-on: ubuntu-20.04
container:
image: "${{ matrix.image_name }}:${{ matrix.pg_version }}${{ vars.image_suffix }}"
options: --user root --dns=8.8.8.8
# Due to Github creates a default network for each job, we need to use
# --dns= to have similar DNS settings as our other CI systems or local
# machines. Otherwise, we may see different results.
needs:
- build
steps:
- uses: actions/checkout@v3.5.0
- uses: "./.github/actions/setup_extension"
- name: Run Test
run: gosu circleci make -C src/test/${{ matrix.suite }} ${{ matrix.make }}
timeout-minutes: 20
- uses: "./.github/actions/save_logs_and_results"
if: always()
with:
folder: ${{ matrix.pg_version }}_${{ matrix.make }}
- uses: "./.github/actions/upload_coverage"
if: always()
with:
flags: ${{ env.PG_MAJOR }}_${{ matrix.suite }}_${{ matrix.make }}
codecov_token: ${{ secrets.CODECOV_TOKEN }}
test-arbitrary-configs:
name: PG${{ matrix.pg_version }} - check-arbitrary-configs-${{ matrix.parallel }}
runs-on: ["self-hosted", "1ES.Pool=1es-gha-citusdata-pool"]
container:
image: "${{ matrix.image_name }}:${{ matrix.pg_version }}${{ vars.image_suffix }}"
options: --user root
needs:
- build
strategy:
fail-fast: false
matrix:
image_name:
- ${{ vars.fail_test_image_name }}
pg_version:
- ${{ vars.pg14_version }}
- ${{ vars.pg15_version }}
- ${{ vars.pg16_version }}
parallel: [0,1,2,3,4,5] # workaround for running 6 parallel jobs
steps:
- uses: actions/checkout@v3.5.0
- uses: "./.github/actions/setup_extension"
- name: Test arbitrary configs
run: |-
# we use parallel jobs to split the tests into 6 parts and run them in parallel
# the script below extracts the tests for the current job
N=6 # Total number of jobs (see matrix.parallel)
X=${{ matrix.parallel }} # Current job number
TESTS=$(src/test/regress/citus_tests/print_test_names.py |
tr '\n' ',' | awk -v N="$N" -v X="$X" -F, '{
split("", parts)
for (i = 1; i <= NF; i++) {
parts[i % N] = parts[i % N] $i ","
}
print substr(parts[X], 1, length(parts[X])-1)
}')
echo $TESTS
gosu circleci \
make -C src/test/regress \
check-arbitrary-configs parallel=4 CONFIGS=$TESTS
- uses: "./.github/actions/save_logs_and_results"
if: always()
- uses: "./.github/actions/upload_coverage"
if: always()
with:
flags: ${{ env.pg_major }}_upgrade
codecov_token: ${{ secrets.CODECOV_TOKEN }}
test-pg-upgrade:
name: PG${{ matrix.old_pg_major }}-PG${{ matrix.new_pg_major }} - check-pg-upgrade
runs-on: ubuntu-20.04
container:
image: "${{ vars.pgupgrade_image_name }}:${{ vars.upgrade_pg_versions }}${{ vars.image_suffix }}"
options: --user root
needs:
- build
strategy:
fail-fast: false
matrix:
include:
- old_pg_major: 14
new_pg_major: 15
- old_pg_major: 15
new_pg_major: 16
- old_pg_major: 14
new_pg_major: 16
env:
old_pg_major: ${{ matrix.old_pg_major }}
new_pg_major: ${{ matrix.new_pg_major }}
steps:
- uses: actions/checkout@v3.5.0
- uses: "./.github/actions/setup_extension"
with:
pg_major: "${{ env.old_pg_major }}"
- uses: "./.github/actions/setup_extension"
with:
pg_major: "${{ env.new_pg_major }}"
- name: Install and test postgres upgrade
run: |-
gosu circleci \
make -C src/test/regress \
check-pg-upgrade \
old-bindir=/usr/lib/postgresql/${{ env.old_pg_major }}/bin \
new-bindir=/usr/lib/postgresql/${{ env.new_pg_major }}/bin
- name: Copy pg_upgrade logs for newData dir
run: |-
mkdir -p /tmp/pg_upgrade_newData_logs
if ls src/test/regress/tmp_upgrade/newData/*.log 1> /dev/null 2>&1; then
cp src/test/regress/tmp_upgrade/newData/*.log /tmp/pg_upgrade_newData_logs
fi
if: failure()
- uses: "./.github/actions/save_logs_and_results"
if: always()
- uses: "./.github/actions/upload_coverage"
if: always()
with:
flags: ${{ env.old_pg_major }}_${{ env.new_pg_major }}_upgrade
codecov_token: ${{ secrets.CODECOV_TOKEN }}
test-citus-upgrade:
name: PG${{ vars.pg14_version }} - check-citus-upgrade
runs-on: ubuntu-20.04
container:
image: "${{ vars.citusupgrade_image_name }}:${{ vars.pg14_version }}${{ vars.image_suffix }}"
options: --user root
needs:
- build
steps:
- uses: actions/checkout@v3.5.0
- uses: "./.github/actions/setup_extension"
with:
skip_installation: true
- name: Install and test citus upgrade
run: |-
# run make check-citus-upgrade for all citus versions
# the image has ${CITUS_VERSIONS} set with all verions it contains the binaries of
for citus_version in ${CITUS_VERSIONS}; do \
gosu circleci \
make -C src/test/regress \
check-citus-upgrade \
bindir=/usr/lib/postgresql/${PG_MAJOR}/bin \
citus-old-version=${citus_version} \
citus-pre-tar=/install-pg${PG_MAJOR}-citus${citus_version}.tar \
citus-post-tar=${GITHUB_WORKSPACE}/install-$PG_MAJOR.tar; \
done;
# run make check-citus-upgrade-mixed for all citus versions
# the image has ${CITUS_VERSIONS} set with all verions it contains the binaries of
for citus_version in ${CITUS_VERSIONS}; do \
gosu circleci \
make -C src/test/regress \
check-citus-upgrade-mixed \
citus-old-version=${citus_version} \
bindir=/usr/lib/postgresql/${PG_MAJOR}/bin \
citus-pre-tar=/install-pg${PG_MAJOR}-citus${citus_version}.tar \
citus-post-tar=${GITHUB_WORKSPACE}/install-$PG_MAJOR.tar; \
done;
- uses: "./.github/actions/save_logs_and_results"
if: always()
- uses: "./.github/actions/upload_coverage"
if: always()
with:
flags: ${{ env.pg_major }}_upgrade
codecov_token: ${{ secrets.CODECOV_TOKEN }}
upload-coverage:
if: always()
env:
CC_TEST_REPORTER_ID: ${{ secrets.CC_TEST_REPORTER_ID }}
runs-on: ubuntu-20.04
container:
image: ${{ vars.test_image_name }}:${{ vars.pg16_version }}${{ vars.image_suffix }}
needs:
- test-citus
- test-arbitrary-configs
- test-citus-upgrade
- test-pg-upgrade
steps:
- uses: actions/download-artifact@v3.0.1
with:
name: "codeclimate"
path: "codeclimate"
- name: Upload coverage results to Code Climate
run: |-
cc-test-reporter sum-coverage codeclimate/*.json -o total.json
cc-test-reporter upload-coverage -i total.json
ch_benchmark:
name: CH Benchmark
if: startsWith(github.ref, 'refs/heads/ch_benchmark/')
runs-on: ubuntu-20.04
needs:
- build
steps:
- uses: actions/checkout@v3.5.0
- uses: azure/login@v1
with:
creds: ${{ secrets.AZURE_CREDENTIALS }}
- name: install dependencies and run ch_benchmark tests
uses: azure/CLI@v1
with:
inlineScript: |
cd ./src/test/hammerdb
chmod +x run_hammerdb.sh
run_hammerdb.sh citusbot_ch_benchmark_rg
tpcc_benchmark:
name: TPCC Benchmark
if: startsWith(github.ref, 'refs/heads/tpcc_benchmark/')
runs-on: ubuntu-20.04
needs:
- build
steps:
- uses: actions/checkout@v3.5.0
- uses: azure/login@v1
with:
creds: ${{ secrets.AZURE_CREDENTIALS }}
- name: install dependencies and run tpcc_benchmark tests
uses: azure/CLI@v1
with:
inlineScript: |
cd ./src/test/hammerdb
chmod +x run_hammerdb.sh
run_hammerdb.sh citusbot_tpcc_benchmark_rg
prepare_parallelization_matrix_32:
name: Parallel 32
if: ${{ needs.test-flakyness-pre.outputs.tests != ''}}
needs: test-flakyness-pre
runs-on: ubuntu-20.04
outputs:
json: ${{ steps.parallelization.outputs.json }}
steps:
- uses: actions/checkout@v3.5.0
- uses: "./.github/actions/parallelization"
id: parallelization
with:
count: 32
test-flakyness-pre:
name: Detect regression tests need to be ran
if: ${{ !inputs.skip_test_flakyness }}}
runs-on: ubuntu-20.04
needs: build
outputs:
tests: ${{ steps.detect-regression-tests.outputs.tests }}
steps:
- uses: actions/checkout@v3.5.0
with:
fetch-depth: 0
- name: Detect regression tests need to be ran
id: detect-regression-tests
run: |-
detected_changes=$(git diff origin/main... --name-only --diff-filter=AM | (grep 'src/test/regress/sql/.*\.sql\|src/test/regress/spec/.*\.spec\|src/test/regress/citus_tests/test/test_.*\.py' || true))
tests=${detected_changes}
if [ -z "$tests" ]; then
echo "No test found."
else
echo "Detected tests " $tests
fi
echo tests="$tests" >> "$GITHUB_OUTPUT"
test-flakyness:
if: ${{ needs.test-flakyness-pre.outputs.tests != ''}}
name: Test flakyness
runs-on: ubuntu-20.04
container:
image: ${{ vars.fail_test_image_name }}:${{ vars.pg16_version }}${{ vars.image_suffix }}
options: --user root
env:
runs: 8
needs:
- build
- test-flakyness-pre
- prepare_parallelization_matrix_32
strategy:
fail-fast: false
matrix: ${{ fromJson(needs.prepare_parallelization_matrix_32.outputs.json) }}
steps:
- uses: actions/checkout@v3.5.0
- uses: actions/download-artifact@v3.0.1
- uses: "./.github/actions/setup_extension"
- name: Run minimal tests
run: |-
tests="${{ needs.test-flakyness-pre.outputs.tests }}"
tests_array=($tests)
for test in "${tests_array[@]}"
do
test_name=$(echo "$test" | sed -r "s/.+\/(.+)\..+/\1/")
gosu circleci src/test/regress/citus_tests/run_test.py $test_name --repeat ${{ env.runs }} --use-base-schedule --use-whole-schedule-line
done
shell: bash
- uses: "./.github/actions/save_logs_and_results"
if: always()

View File

@ -0,0 +1,79 @@
name: Flaky test debugging
run-name: Flaky test debugging - ${{ inputs.flaky_test }} (${{ inputs.flaky_test_runs_per_job }}x${{ inputs.flaky_test_parallel_jobs }})
concurrency:
group: ${{ github.workflow }}-${{ github.event.pull_request.number || github.ref }}
cancel-in-progress: true
on:
workflow_dispatch:
inputs:
flaky_test:
required: true
type: string
description: Test to run
flaky_test_runs_per_job:
required: false
default: 8
type: number
description: Number of times to run the test
flaky_test_parallel_jobs:
required: false
default: 32
type: number
description: Number of parallel jobs to run
jobs:
build:
name: Build Citus
runs-on: ubuntu-latest
container:
image: ${{ vars.build_image_name }}:${{ vars.pg15_version }}${{ vars.image_suffix }}
options: --user root
steps:
- uses: actions/checkout@v3.5.0
- name: Configure, Build, and Install
run: |
echo "PG_MAJOR=${PG_MAJOR}" >> $GITHUB_ENV
./ci/build-citus.sh
shell: bash
- uses: actions/upload-artifact@v3.1.1
with:
name: build-${{ env.PG_MAJOR }}
path: |-
./build-${{ env.PG_MAJOR }}/*
./install-${{ env.PG_MAJOR }}.tar
prepare_parallelization_matrix:
name: Prepare parallelization matrix
runs-on: ubuntu-latest
outputs:
json: ${{ steps.parallelization.outputs.json }}
steps:
- uses: actions/checkout@v3.5.0
- uses: "./.github/actions/parallelization"
id: parallelization
with:
count: ${{ inputs.flaky_test_parallel_jobs }}
test_flakyness:
name: Test flakyness
runs-on: ubuntu-latest
container:
image: ${{ vars.fail_test_image_name }}:${{ vars.pg15_version }}${{ vars.image_suffix }}
options: --user root
needs:
[build, prepare_parallelization_matrix]
env:
test: "${{ inputs.flaky_test }}"
runs: "${{ inputs.flaky_test_runs_per_job }}"
skip: false
strategy:
fail-fast: false
matrix: ${{ fromJson(needs.prepare_parallelization_matrix.outputs.json) }}
steps:
- uses: actions/checkout@v3.5.0
- uses: "./.github/actions/setup_extension"
- name: Run minimal tests
run: |-
gosu circleci src/test/regress/citus_tests/run_test.py ${{ env.test }} --repeat ${{ env.runs }} --use-base-schedule --use-whole-schedule-line
shell: bash
- uses: "./.github/actions/save_logs_and_results"
if: always()
with:
folder: ${{ matrix.id }}

View File

@ -15,9 +15,6 @@ PG_MAJOR=${PG_MAJOR:?please provide the postgres major version}
codename=${VERSION#*(} codename=${VERSION#*(}
codename=${codename%)*} codename=${codename%)*}
# get project from argument
project="${CIRCLE_PROJECT_REPONAME}"
# we'll do everything with absolute paths # we'll do everything with absolute paths
basedir="$(pwd)" basedir="$(pwd)"
@ -28,7 +25,7 @@ build_ext() {
pg_major="$1" pg_major="$1"
builddir="${basedir}/build-${pg_major}" builddir="${basedir}/build-${pg_major}"
echo "Beginning build of ${project} for PostgreSQL ${pg_major}..." >&2 echo "Beginning build for PostgreSQL ${pg_major}..." >&2
# do everything in a subdirectory to avoid clutter in current directory # do everything in a subdirectory to avoid clutter in current directory
mkdir -p "${builddir}" && cd "${builddir}" mkdir -p "${builddir}" && cd "${builddir}"

View File

@ -39,8 +39,6 @@
static AlterOwnerStmt * RecreateAlterDatabaseOwnerStmt(Oid databaseOid); static AlterOwnerStmt * RecreateAlterDatabaseOwnerStmt(Oid databaseOid);
static List * CreateDDLTaskList(char *command, List *workerNodeList,
bool outsideTransaction);
PG_FUNCTION_INFO_V1(citus_internal_database_command); PG_FUNCTION_INFO_V1(citus_internal_database_command);
static Oid get_database_owner(Oid db_oid); static Oid get_database_owner(Oid db_oid);
@ -264,36 +262,6 @@ PreprocessAlterDatabaseRenameStmt(Node *node, const char *queryString,
} }
/*
* CreateDDLTaskList creates a task list for running a single DDL command.
*/
static List *
CreateDDLTaskList(char *command, List *workerNodeList, bool outsideTransaction)
{
List *commandList = list_make3(DISABLE_DDL_PROPAGATION,
command,
ENABLE_DDL_PROPAGATION);
Task *task = CitusMakeNode(Task);
task->taskType = DDL_TASK;
SetTaskQueryStringList(task, commandList);
task->cannotBeExecutedInTransaction = outsideTransaction;
WorkerNode *workerNode = NULL;
foreach_ptr(workerNode, workerNodeList)
{
ShardPlacement *targetPlacement = CitusMakeNode(ShardPlacement);
targetPlacement->nodeName = workerNode->workerName;
targetPlacement->nodePort = workerNode->workerPort;
targetPlacement->groupId = workerNode->groupId;
task->taskPlacementList = lappend(task->taskPlacementList,
targetPlacement);
}
return list_make1(task);
}
/* /*
* PreprocessAlterDatabaseSetStmt is executed before the statement is applied to the local * PreprocessAlterDatabaseSetStmt is executed before the statement is applied to the local
* postgres instance. * postgres instance.
@ -333,30 +301,13 @@ PreprocessAlterDatabaseSetStmt(Node *node, const char *queryString,
List * List *
PostprocessCreateDatabaseStmt(Node *node, const char *queryString) PostprocessCreateDatabaseStmt(Node *node, const char *queryString)
{ {
if (EnableCreateDatabasePropagation)
{
EnsureCoordinator();
}
if (!EnableCreateDatabasePropagation || !ShouldPropagate()) if (!EnableCreateDatabasePropagation || !ShouldPropagate())
{ {
return NIL; return NIL;
} }
CreatedbStmt *stmt = castNode(CreatedbStmt, node); EnsureCoordinator();
char *databaseName = stmt->dbname;
bool missingOk = false;
Oid databaseOid = get_database_oid(databaseName, missingOk);
/*
* TODO: try to reuse regular DDL infrastructure
*
* We do not do this right now because of the AssignDatabaseToShard at the end.
*/
List *workerNodes = TargetWorkerSetNodeList(NON_COORDINATOR_METADATA_NODES,
RowShareLock);
if (list_length(workerNodes) > 0)
{
char *createDatabaseCommand = DeparseTreeNode(node); char *createDatabaseCommand = DeparseTreeNode(node);
StringInfo internalCreateCommand = makeStringInfo(); StringInfo internalCreateCommand = makeStringInfo();
@ -364,27 +315,11 @@ PostprocessCreateDatabaseStmt(Node *node, const char *queryString)
"SELECT pg_catalog.citus_internal_database_command(%s)", "SELECT pg_catalog.citus_internal_database_command(%s)",
quote_literal_cstr(createDatabaseCommand)); quote_literal_cstr(createDatabaseCommand));
/* List *commands = list_make3(DISABLE_DDL_PROPAGATION,
* For the moment, we run CREATE DATABASE in 2PC, though that prevents (void *) internalCreateCommand->data,
* us from immediately doing a pg_dump | pg_restore when dealing with ENABLE_DDL_PROPAGATION);
* a remote template database.
*/
bool outsideTransaction = false;
List *taskList = CreateDDLTaskList(internalCreateCommand->data, workerNodes, return NodeDDLTaskList(NON_COORDINATOR_NODES, commands);
outsideTransaction);
bool localExecutionSupported = false;
ExecuteUtilityTaskList(taskList, localExecutionSupported);
}
/* synchronize pg_dist_object records */
ObjectAddress dbAddress = { 0 };
ObjectAddressSet(dbAddress, DatabaseRelationId, databaseOid);
MarkObjectDistributed(&dbAddress);
return NIL;
} }
@ -409,9 +344,15 @@ citus_internal_database_command(PG_FUNCTION_ARGS)
(superuser() ? PGC_SUSET : PGC_USERSET), PGC_S_SESSION, (superuser() ? PGC_SUSET : PGC_USERSET), PGC_S_SESSION,
GUC_ACTION_LOCAL, true, 0, false); GUC_ACTION_LOCAL, true, 0, false);
set_config_option("synchronous_commit", "off",
(superuser() ? PGC_SUSET : PGC_USERSET), PGC_S_SESSION, /*
GUC_ACTION_LOCAL, true, 0, false); * createdb() / DropDatabase() uses ParseState to report the error position for the
* input command and the position is reported to be 0 when it's provided as NULL.
* We're okay with that because we don't expect this UDF to be called with an incorrect
* DDL command.
*
*/
ParseState *pstate = NULL;
if (IsA(parseTree, CreatedbStmt)) if (IsA(parseTree, CreatedbStmt))
{ {
@ -422,7 +363,7 @@ citus_internal_database_command(PG_FUNCTION_ARGS)
if (!OidIsValid(databaseOid)) if (!OidIsValid(databaseOid))
{ {
createdb(NULL, (CreatedbStmt *) parseTree); createdb(pstate, (CreatedbStmt *) parseTree);
} }
} }
else if (IsA(parseTree, DropdbStmt)) else if (IsA(parseTree, DropdbStmt))
@ -435,7 +376,7 @@ citus_internal_database_command(PG_FUNCTION_ARGS)
if (OidIsValid(databaseOid)) if (OidIsValid(databaseOid))
{ {
DropDatabase(NULL, (DropdbStmt *) parseTree); DropDatabase(pstate, (DropdbStmt *) parseTree);
} }
} }
else if (IsA(parseTree, AlterDatabaseStmt)) else if (IsA(parseTree, AlterDatabaseStmt))
@ -473,10 +414,12 @@ PreprocessDropDatabaseStmt(Node *node, const char *queryString,
return NIL; return NIL;
} }
EnsureCoordinator();
DropdbStmt *stmt = (DropdbStmt *) node; DropdbStmt *stmt = (DropdbStmt *) node;
char *databaseName = stmt->dbname;
bool missingOk = true; Oid databaseOid = get_database_oid(stmt->dbname, stmt->missing_ok);
Oid databaseOid = get_database_oid(databaseName, missingOk);
if (databaseOid == InvalidOid) if (databaseOid == InvalidOid)
{ {
/* let regular ProcessUtility deal with IF NOT EXISTS */ /* let regular ProcessUtility deal with IF NOT EXISTS */
@ -490,12 +433,7 @@ PreprocessDropDatabaseStmt(Node *node, const char *queryString,
return NIL; return NIL;
} }
List *workerNodes = TargetWorkerSetNodeList(NON_COORDINATOR_METADATA_NODES, UnmarkObjectDistributed(&dbAddress);
RowShareLock);
if (list_length(workerNodes) == 0)
{
return NIL;
}
char *dropDatabaseCommand = DeparseTreeNode(node); char *dropDatabaseCommand = DeparseTreeNode(node);
@ -504,23 +442,22 @@ PreprocessDropDatabaseStmt(Node *node, const char *queryString,
"SELECT pg_catalog.citus_internal_database_command(%s)", "SELECT pg_catalog.citus_internal_database_command(%s)",
quote_literal_cstr(dropDatabaseCommand)); quote_literal_cstr(dropDatabaseCommand));
/* Delete from pg_dist_object */
if (IsObjectDistributed(&dbAddress)) List *commands = list_make3(DISABLE_DDL_PROPAGATION,
{ (void *) internalDropCommand->data,
UnmarkObjectDistributed(&dbAddress); ENABLE_DDL_PROPAGATION);
}
/* ExecuteDistributedDDLJob could not be used since it depends on namespace and return NodeDDLTaskList(NON_COORDINATOR_NODES, commands);
* database does not have namespace. }
*/
bool outsideTransaction = false; List *
List *taskList = CreateDDLTaskList(internalDropCommand->data, workerNodes, CreateDatabaseStmtObjectAddress(Node *node, bool missing_ok, bool isPostprocess)
outsideTransaction); {
CreatedbStmt *stmt = castNode(CreatedbStmt, node);
bool localExecutionSupported = false; Oid databaseOid = get_database_oid(stmt->dbname, missing_ok);
ExecuteUtilityTaskList(taskList, localExecutionSupported); ObjectAddress *dbAddress = palloc0(sizeof(ObjectAddress));
ObjectAddressSet(*dbAddress, DatabaseRelationId, databaseOid);
return NIL;
return list_make1(dbAddress);
} }

View File

@ -473,8 +473,8 @@ static DistributeObjectOps Database_Create = {
.postprocess = PostprocessCreateDatabaseStmt, .postprocess = PostprocessCreateDatabaseStmt,
.objectType = OBJECT_DATABASE, .objectType = OBJECT_DATABASE,
.operationType = DIST_OPS_CREATE, .operationType = DIST_OPS_CREATE,
.address = NULL, .address = CreateDatabaseStmtObjectAddress,
.markDistributed = false, .markDistributed = true,
}; };
static DistributeObjectOps Database_Drop = { static DistributeObjectOps Database_Drop = {

View File

@ -938,7 +938,7 @@ CreateIndexTaskList(IndexStmt *indexStmt)
task->dependentTaskList = NULL; task->dependentTaskList = NULL;
task->anchorShardId = shardId; task->anchorShardId = shardId;
task->taskPlacementList = ActiveShardPlacementList(shardId); task->taskPlacementList = ActiveShardPlacementList(shardId);
task->cannotBeExecutedInTransaction = indexStmt->concurrent; task->cannotBeExecutedInTransction = indexStmt->concurrent;
taskList = lappend(taskList, task); taskList = lappend(taskList, task);
@ -983,7 +983,7 @@ CreateReindexTaskList(Oid relationId, ReindexStmt *reindexStmt)
task->dependentTaskList = NULL; task->dependentTaskList = NULL;
task->anchorShardId = shardId; task->anchorShardId = shardId;
task->taskPlacementList = ActiveShardPlacementList(shardId); task->taskPlacementList = ActiveShardPlacementList(shardId);
task->cannotBeExecutedInTransaction = task->cannotBeExecutedInTransction =
IsReindexWithParam_compat(reindexStmt, "concurrently"); IsReindexWithParam_compat(reindexStmt, "concurrently");
taskList = lappend(taskList, task); taskList = lappend(taskList, task);
@ -1309,7 +1309,7 @@ DropIndexTaskList(Oid relationId, Oid indexId, DropStmt *dropStmt)
task->dependentTaskList = NULL; task->dependentTaskList = NULL;
task->anchorShardId = shardId; task->anchorShardId = shardId;
task->taskPlacementList = ActiveShardPlacementList(shardId); task->taskPlacementList = ActiveShardPlacementList(shardId);
task->cannotBeExecutedInTransaction = dropStmt->concurrent; task->cannotBeExecutedInTransction = dropStmt->concurrent;
taskList = lappend(taskList, task); taskList = lappend(taskList, task);

View File

@ -279,7 +279,7 @@ VacuumTaskList(Oid relationId, CitusVacuumParams vacuumParams, List *vacuumColum
task->replicationModel = REPLICATION_MODEL_INVALID; task->replicationModel = REPLICATION_MODEL_INVALID;
task->anchorShardId = shardId; task->anchorShardId = shardId;
task->taskPlacementList = ActiveShardPlacementList(shardId); task->taskPlacementList = ActiveShardPlacementList(shardId);
task->cannotBeExecutedInTransaction = ((vacuumParams.options) & VACOPT_VACUUM); task->cannotBeExecutedInTransction = ((vacuumParams.options) & VACOPT_VACUUM);
taskList = lappend(taskList, task); taskList = lappend(taskList, task);
} }
@ -719,7 +719,7 @@ ExecuteUnqualifiedVacuumTasks(VacuumStmt *vacuumStmt, CitusVacuumParams vacuumPa
SetTaskQueryStringList(task, unqualifiedVacuumCommands); SetTaskQueryStringList(task, unqualifiedVacuumCommands);
task->dependentTaskList = NULL; task->dependentTaskList = NULL;
task->replicationModel = REPLICATION_MODEL_INVALID; task->replicationModel = REPLICATION_MODEL_INVALID;
task->cannotBeExecutedInTransaction = ((vacuumParams.options) & VACOPT_VACUUM); task->cannotBeExecutedInTransction = ((vacuumParams.options) & VACOPT_VACUUM);
bool hasPeerWorker = false; bool hasPeerWorker = false;

View File

@ -31,36 +31,36 @@ optionToStatement(StringInfo buf, DefElem *option, const struct
{ {
if (strcmp(name, opt_formats[i].name) == 0) if (strcmp(name, opt_formats[i].name) == 0)
{ {
if (strcmp(opt_formats[i].type, "string") == 0) if (opt_formats[i].type == OPTION_FORMAT_STRING)
{ {
char *value = defGetString(option); char *value = defGetString(option);
appendStringInfo(buf, opt_formats[i].format, quote_identifier(value)); appendStringInfo(buf, opt_formats[i].format, quote_identifier(value));
} }
else if (strcmp(opt_formats[i].type, "integer") == 0) else if (opt_formats[i].type == OPTION_FORMAT_INTEGER)
{ {
int32 value = defGetInt32(option); int32 value = defGetInt32(option);
appendStringInfo(buf, opt_formats[i].format, value); appendStringInfo(buf, opt_formats[i].format, value);
} }
else if (strcmp(opt_formats[i].type, "boolean") == 0) else if (opt_formats[i].type == OPTION_FORMAT_BOOLEAN)
{ {
bool value = defGetBoolean(option); bool value = defGetBoolean(option);
appendStringInfo(buf, opt_formats[i].format, value ? "true" : "false"); appendStringInfo(buf, opt_formats[i].format, value ? "true" : "false");
} }
#if PG_VERSION_NUM >= PG_VERSION_15 #if PG_VERSION_NUM >= PG_VERSION_15
else if (strcmp(opt_formats[i].type, "object_id") == 0) else if (opt_formats[i].type == OPTION_FORMAT_OBJECT_ID)
{ {
Oid value = defGetObjectId(option); Oid value = defGetObjectId(option);
appendStringInfo(buf, opt_formats[i].format, value); appendStringInfo(buf, opt_formats[i].format, value);
} }
#endif #endif
else if (strcmp(opt_formats[i].type, "literal_cstr") == 0) else if (opt_formats[i].type == OPTION_FORMAT_LITERAL_CSTR)
{ {
char *value = defGetString(option); char *value = defGetString(option);
appendStringInfo(buf, opt_formats[i].format, quote_literal_cstr(value)); appendStringInfo(buf, opt_formats[i].format, quote_literal_cstr(value));
} }
else else
{ {
elog(ERROR, "unrecognized option type: %s", opt_formats[i].type); elog(ERROR, "unrecognized option type: %d", opt_formats[i].type);
} }
break; break;
} }

View File

@ -30,22 +30,22 @@ static void AppendAlterDatabaseOwnerStmt(StringInfo buf, AlterOwnerStmt *stmt);
static void AppendAlterDatabaseStmt(StringInfo buf, AlterDatabaseStmt *stmt); static void AppendAlterDatabaseStmt(StringInfo buf, AlterDatabaseStmt *stmt);
const struct option_format create_database_option_formats[] = { const struct option_format create_database_option_formats[] = {
{ "template", " TEMPLATE %s", "string" }, { "owner", " OWNER %s", OPTION_FORMAT_STRING },
{ "owner", " OWNER %s", "string" }, { "template", " TEMPLATE %s", OPTION_FORMAT_STRING },
{ "tablespace", " TABLESPACE %s", "string" }, { "encoding", " ENCODING %s", OPTION_FORMAT_LITERAL_CSTR },
{ "connection_limit", " CONNECTION LIMIT %d", "integer" }, { "strategy", " STRATEGY %s", OPTION_FORMAT_LITERAL_CSTR },
{ "encoding", " ENCODING %s", "literal_cstr" }, { "locale", " LOCALE %s", OPTION_FORMAT_LITERAL_CSTR },
{ "locale", " LOCALE %s", "literal_cstr" }, { "lc_collate", " LC_COLLATE %s", OPTION_FORMAT_LITERAL_CSTR },
{ "lc_collate", " LC_COLLATE %s", "literal_cstr" }, { "lc_ctype", " LC_CTYPE %s", OPTION_FORMAT_LITERAL_CSTR },
{ "lc_ctype", " LC_CTYPE %s", "literal_cstr" }, { "icu_locale", " ICU_LOCALE %s", OPTION_FORMAT_LITERAL_CSTR },
{ "icu_locale", " ICU_LOCALE %s", "literal_cstr" }, { "icu_rules", " ICU_RULES %s", OPTION_FORMAT_LITERAL_CSTR },
{ "icu_rules", " ICU_RULES %s", "literal_cstr" }, { "locale_provider", " LOCALE_PROVIDER %s", OPTION_FORMAT_LITERAL_CSTR },
{ "locale_provider", " LOCALE_PROVIDER %s", "literal_cstr" }, { "collation_version", " COLLATION_VERSION %s", OPTION_FORMAT_LITERAL_CSTR },
{ "is_template", " IS_TEMPLATE %s", "boolean" }, { "tablespace", " TABLESPACE %s", OPTION_FORMAT_STRING },
{ "allow_connections", " ALLOW_CONNECTIONS %s", "boolean" }, { "allow_connections", " ALLOW_CONNECTIONS %s", OPTION_FORMAT_BOOLEAN },
{ "collation_version", " COLLATION_VERSION %s", "literal_cstr" }, { "connection_limit", " CONNECTION LIMIT %d", OPTION_FORMAT_INTEGER },
{ "strategy", " STRATEGY %s", "literal_cstr" }, { "is_template", " IS_TEMPLATE %s", OPTION_FORMAT_BOOLEAN },
{ "oid", " OID %d", "object_id" }, { "oid", " OID %d", OPTION_FORMAT_OBJECT_ID }
}; };
@ -289,8 +289,10 @@ DeparseCreateDatabaseStmt(Node *node)
static void static void
AppendDropDatabaseStmt(StringInfo buf, DropdbStmt *stmt) AppendDropDatabaseStmt(StringInfo buf, DropdbStmt *stmt)
{ {
char *if_exists_statement = stmt->missing_ok ? "IF EXISTS" : "";
appendStringInfo(buf, appendStringInfo(buf,
"DROP DATABASE %s", "DROP DATABASE %s %s",
if_exists_statement,
quote_identifier(stmt->dbname)); quote_identifier(stmt->dbname));
DefElem *option = NULL; DefElem *option = NULL;

View File

@ -61,7 +61,7 @@ TaskListRequiresRollback(List *taskList)
} }
Task *task = (Task *) linitial(taskList); Task *task = (Task *) linitial(taskList);
if (task->cannotBeExecutedInTransaction) if (task->cannotBeExecutedInTransction)
{ {
/* vacuum, create index concurrently etc. */ /* vacuum, create index concurrently etc. */
return false; return false;
@ -164,7 +164,7 @@ TaskListCannotBeExecutedInTransaction(List *taskList)
Task *task = NULL; Task *task = NULL;
foreach_ptr(task, taskList) foreach_ptr(task, taskList)
{ {
if (task->cannotBeExecutedInTransaction) if (task->cannotBeExecutedInTransction)
{ {
return true; return true;
} }

View File

@ -53,7 +53,6 @@
static char * CreatePgDistObjectEntryCommand(const ObjectAddress *objectAddress); static char * CreatePgDistObjectEntryCommand(const ObjectAddress *objectAddress);
static int ExecuteCommandAsSuperuser(char *query, int paramCount, Oid *paramTypes, static int ExecuteCommandAsSuperuser(char *query, int paramCount, Oid *paramTypes,
Datum *paramValues); Datum *paramValues);
bool IsObjectDistributed(const ObjectAddress *address);
PG_FUNCTION_INFO_V1(citus_unmark_object_distributed); PG_FUNCTION_INFO_V1(citus_unmark_object_distributed);
PG_FUNCTION_INFO_V1(master_unmark_object_distributed); PG_FUNCTION_INFO_V1(master_unmark_object_distributed);

View File

@ -310,6 +310,7 @@ static HeapTuple LookupDistPartitionTuple(Relation pgDistPartition, Oid relation
static void GetPartitionTypeInputInfo(char *partitionKeyString, char partitionMethod, static void GetPartitionTypeInputInfo(char *partitionKeyString, char partitionMethod,
Oid *columnTypeId, int32 *columnTypeMod, Oid *columnTypeId, int32 *columnTypeMod,
Oid *intervalTypeId, int32 *intervalTypeMod); Oid *intervalTypeId, int32 *intervalTypeMod);
static void CachedNamespaceLookup(const char *nspname, Oid *cachedOid);
static void CachedRelationLookup(const char *relationName, Oid *cachedOid); static void CachedRelationLookup(const char *relationName, Oid *cachedOid);
static void CachedRelationLookupExtended(const char *relationName, Oid *cachedOid, static void CachedRelationLookupExtended(const char *relationName, Oid *cachedOid,
bool missing_ok); bool missing_ok);
@ -2769,6 +2770,15 @@ DistRebalanceStrategyRelationId(void)
} }
/* return the oid of citus namespace */
Oid
CitusCatalogNamespaceId(void)
{
CachedNamespaceLookup("citus", &MetadataCache.citusCatalogNamespaceId);
return MetadataCache.citusCatalogNamespaceId;
}
/* return oid of pg_dist_object relation */ /* return oid of pg_dist_object relation */
Oid Oid
DistObjectRelationId(void) DistObjectRelationId(void)
@ -2795,14 +2805,12 @@ DistObjectRelationId(void)
true); true);
if (!OidIsValid(MetadataCache.distObjectRelationId)) if (!OidIsValid(MetadataCache.distObjectRelationId))
{ {
Oid citusNamespaceId = get_namespace_oid("citus", false);
/* /*
* We can only ever reach here while we are creating/altering our extension before * We can only ever reach here while we are creating/altering our extension before
* the table is moved to pg_catalog. * the table is moved to pg_catalog.
*/ */
CachedRelationNamespaceLookupExtended("pg_dist_object", CachedRelationNamespaceLookupExtended("pg_dist_object",
citusNamespaceId, CitusCatalogNamespaceId(),
&MetadataCache.distObjectRelationId, &MetadataCache.distObjectRelationId,
false); false);
} }
@ -2836,6 +2844,17 @@ DistObjectPrimaryKeyIndexId(void)
&MetadataCache.distObjectPrimaryKeyIndexId, &MetadataCache.distObjectPrimaryKeyIndexId,
true); true);
if (!OidIsValid(MetadataCache.distObjectPrimaryKeyIndexId))
{
/*
* We can only ever reach here while we are creating/altering our extension before
* the table is moved to pg_catalog.
*/
CachedRelationNamespaceLookupExtended("pg_dist_object_pkey",
CitusCatalogNamespaceId(),
&MetadataCache.distObjectPrimaryKeyIndexId,
false);
}
return MetadataCache.distObjectPrimaryKeyIndexId; return MetadataCache.distObjectPrimaryKeyIndexId;
} }
@ -5401,6 +5420,30 @@ DeformedDistShardTupleToShardInterval(Datum *datumArray, bool *isNullArray,
} }
/*
* CachedNamespaceLookup performs a cached lookup for the namespace (schema), with the
* result cached in cachedOid.
*/
static void
CachedNamespaceLookup(const char *nspname, Oid *cachedOid)
{
/* force callbacks to be registered, so we always get notified upon changes */
InitializeCaches();
if (*cachedOid == InvalidOid)
{
*cachedOid = get_namespace_oid(nspname, true);
if (*cachedOid == InvalidOid)
{
ereport(ERROR, (errmsg(
"cache lookup failed for namespace %s, called too early?",
nspname)));
}
}
}
/* /*
* CachedRelationLookup performs a cached lookup for the relation * CachedRelationLookup performs a cached lookup for the relation
* relationName, with the result cached in *cachedOid. * relationName, with the result cached in *cachedOid.

View File

@ -326,7 +326,7 @@ CopyNodeTask(COPYFUNC_ARGS)
COPY_STRING_FIELD(fetchedExplainAnalyzePlan); COPY_STRING_FIELD(fetchedExplainAnalyzePlan);
COPY_SCALAR_FIELD(fetchedExplainAnalyzeExecutionDuration); COPY_SCALAR_FIELD(fetchedExplainAnalyzeExecutionDuration);
COPY_SCALAR_FIELD(isLocalTableModification); COPY_SCALAR_FIELD(isLocalTableModification);
COPY_SCALAR_FIELD(cannotBeExecutedInTransaction); COPY_SCALAR_FIELD(cannotBeExecutedInTransction);
} }

View File

@ -535,7 +535,7 @@ OutTask(OUTFUNC_ARGS)
WRITE_STRING_FIELD(fetchedExplainAnalyzePlan); WRITE_STRING_FIELD(fetchedExplainAnalyzePlan);
WRITE_FLOAT_FIELD(fetchedExplainAnalyzeExecutionDuration, "%.2f"); WRITE_FLOAT_FIELD(fetchedExplainAnalyzeExecutionDuration, "%.2f");
WRITE_BOOL_FIELD(isLocalTableModification); WRITE_BOOL_FIELD(isLocalTableModification);
WRITE_BOOL_FIELD(cannotBeExecutedInTransaction); WRITE_BOOL_FIELD(cannotBeExecutedInTransction);
} }

View File

@ -22,6 +22,7 @@
#include "tcop/utility.h" #include "tcop/utility.h"
#include "utils/acl.h" #include "utils/acl.h"
extern bool AddAllLocalTablesToMetadata; extern bool AddAllLocalTablesToMetadata;
extern bool EnableSchemaBasedSharding; extern bool EnableSchemaBasedSharding;
@ -57,6 +58,7 @@ typedef enum DistOpsOperationType
DIST_OPS_DROP, DIST_OPS_DROP,
} DistOpsOperationType; } DistOpsOperationType;
/* /*
* DistributeObjectOps specifies handlers for node/object type pairs. * DistributeObjectOps specifies handlers for node/object type pairs.
* Instances of this type should all be declared in deparse.c. * Instances of this type should all be declared in deparse.c.
@ -77,11 +79,11 @@ typedef enum DistOpsOperationType
*/ */
typedef struct DistributeObjectOps typedef struct DistributeObjectOps
{ {
char *(*deparse)(Node *); char * (*deparse)(Node *);
void (*qualify)(Node *); void (*qualify)(Node *);
List *(*preprocess)(Node *, const char *, ProcessUtilityContext); List * (*preprocess)(Node *, const char *, ProcessUtilityContext);
List *(*postprocess)(Node *, const char *); List * (*postprocess)(Node *, const char *);
List *(*address)(Node *, bool, bool); List * (*address)(Node *, bool, bool);
bool markDistributed; bool markDistributed;
/* fields used by common implementations, omitted for specialized implementations */ /* fields used by common implementations, omitted for specialized implementations */
@ -138,6 +140,7 @@ typedef enum ExtractForeignKeyConstraintsMode
INCLUDE_SINGLE_SHARD_TABLES INCLUDE_SINGLE_SHARD_TABLES
} ExtractForeignKeyConstraintMode; } ExtractForeignKeyConstraintMode;
/* /*
* Flags that can be passed to GetForeignKeyIdsForColumn to * Flags that can be passed to GetForeignKeyIdsForColumn to
* indicate whether relationId argument should match: * indicate whether relationId argument should match:
@ -156,6 +159,7 @@ typedef enum SearchForeignKeyColumnFlags
/* callers can also pass union of above flags */ /* callers can also pass union of above flags */
} SearchForeignKeyColumnFlags; } SearchForeignKeyColumnFlags;
typedef enum TenantOperation typedef enum TenantOperation
{ {
TENANT_UNDISTRIBUTE_TABLE = 0, TENANT_UNDISTRIBUTE_TABLE = 0,
@ -193,9 +197,11 @@ extern List * DropTextSearchDictObjectAddress(Node *node, bool missing_ok, bool
/* index.c */ /* index.c */
typedef void (*PGIndexProcessor)(Form_pg_index, List **, int); typedef void (*PGIndexProcessor)(Form_pg_index, List **, int);
/* call.c */ /* call.c */
extern bool CallDistributedProcedureRemotely(CallStmt *callStmt, DestReceiver *dest); extern bool CallDistributedProcedureRemotely(CallStmt *callStmt, DestReceiver *dest);
/* collation.c - forward declarations */ /* collation.c - forward declarations */
extern char * CreateCollationDDL(Oid collationId); extern char * CreateCollationDDL(Oid collationId);
extern List * CreateCollationDDLsIdempotent(Oid collationId); extern List * CreateCollationDDLsIdempotent(Oid collationId);
@ -224,6 +230,10 @@ extern List * PreprocessAlterDatabaseRefreshCollStmt(Node *node, const char *que
ProcessUtilityContext ProcessUtilityContext
processUtilityContext); processUtilityContext);
extern List * CreateDatabaseStmtObjectAddress(Node *node, bool missing_ok, bool
isPostprocess);
extern List * PreprocessAlterDatabaseSetStmt(Node *node, const char *queryString, extern List * PreprocessAlterDatabaseSetStmt(Node *node, const char *queryString,
ProcessUtilityContext processUtilityContext); ProcessUtilityContext processUtilityContext);
@ -250,6 +260,7 @@ extern List * RenameDomainStmtObjectAddress(Node *node, bool missing_ok, bool
extern CreateDomainStmt * RecreateDomainStmt(Oid domainOid); extern CreateDomainStmt * RecreateDomainStmt(Oid domainOid);
extern Oid get_constraint_typid(Oid conoid); extern Oid get_constraint_typid(Oid conoid);
/* extension.c - forward declarations */ /* extension.c - forward declarations */
extern bool IsDropCitusExtensionStmt(Node *parsetree); extern bool IsDropCitusExtensionStmt(Node *parsetree);
extern List * GetDependentFDWsToExtension(Oid extensionId); extern List * GetDependentFDWsToExtension(Oid extensionId);
@ -328,11 +339,13 @@ extern Oid GetReferencedTableId(Oid foreignKeyId);
extern Oid GetReferencingTableId(Oid foreignKeyId); extern Oid GetReferencingTableId(Oid foreignKeyId);
extern bool RelationInvolvedInAnyNonInheritedForeignKeys(Oid relationId); extern bool RelationInvolvedInAnyNonInheritedForeignKeys(Oid relationId);
/* foreign_data_wrapper.c - forward declarations */ /* foreign_data_wrapper.c - forward declarations */
extern List * PreprocessGrantOnFDWStmt(Node *node, const char *queryString, extern List * PreprocessGrantOnFDWStmt(Node *node, const char *queryString,
ProcessUtilityContext processUtilityContext); ProcessUtilityContext processUtilityContext);
extern Acl * GetPrivilegesForFDW(Oid FDWOid); extern Acl * GetPrivilegesForFDW(Oid FDWOid);
/* foreign_server.c - forward declarations */ /* foreign_server.c - forward declarations */
extern List * PreprocessGrantOnForeignServerStmt(Node *node, const char *queryString, extern List * PreprocessGrantOnForeignServerStmt(Node *node, const char *queryString,
ProcessUtilityContext ProcessUtilityContext
@ -343,15 +356,17 @@ extern List * AlterForeignServerStmtObjectAddress(Node *node, bool missing_ok, b
isPostprocess); isPostprocess);
extern List * RenameForeignServerStmtObjectAddress(Node *node, bool missing_ok, bool extern List * RenameForeignServerStmtObjectAddress(Node *node, bool missing_ok, bool
isPostprocess); isPostprocess);
extern List * AlterForeignServerOwnerStmtObjectAddress(Node *node, bool missing_ok, bool extern List * AlterForeignServerOwnerStmtObjectAddress(Node *node, bool
isPostprocess); missing_ok, bool isPostprocess);
extern List * GetForeignServerCreateDDLCommand(Oid serverId); extern List * GetForeignServerCreateDDLCommand(Oid serverId);
/* foreign_table.c - forward declarations */ /* foreign_table.c - forward declarations */
extern List * PreprocessAlterForeignTableSchemaStmt(Node *node, const char *queryString, extern List * PreprocessAlterForeignTableSchemaStmt(Node *node, const char *queryString,
ProcessUtilityContext ProcessUtilityContext
processUtilityContext); processUtilityContext);
/* function.c - forward declarations */ /* function.c - forward declarations */
extern List * PreprocessCreateFunctionStmt(Node *stmt, const char *queryString, extern List * PreprocessCreateFunctionStmt(Node *stmt, const char *queryString,
ProcessUtilityContext processUtilityContext); ProcessUtilityContext processUtilityContext);
@ -381,12 +396,14 @@ extern List * PreprocessGrantOnFunctionStmt(Node *node, const char *queryString,
ProcessUtilityContext processUtilityContext); ProcessUtilityContext processUtilityContext);
extern List * PostprocessGrantOnFunctionStmt(Node *node, const char *queryString); extern List * PostprocessGrantOnFunctionStmt(Node *node, const char *queryString);
/* grant.c - forward declarations */ /* grant.c - forward declarations */
extern List * PreprocessGrantStmt(Node *node, const char *queryString, extern List * PreprocessGrantStmt(Node *node, const char *queryString,
ProcessUtilityContext processUtilityContext); ProcessUtilityContext processUtilityContext);
extern void deparsePrivileges(StringInfo privsString, GrantStmt *grantStmt); extern void deparsePrivileges(StringInfo privsString, GrantStmt *grantStmt);
extern void deparseGrantees(StringInfo granteesString, GrantStmt *grantStmt); extern void deparseGrantees(StringInfo granteesString, GrantStmt *grantStmt);
/* index.c - forward declarations */ /* index.c - forward declarations */
extern bool IsIndexRenameStmt(RenameStmt *renameStmt); extern bool IsIndexRenameStmt(RenameStmt *renameStmt);
extern List * PreprocessIndexStmt(Node *createIndexStatement, extern List * PreprocessIndexStmt(Node *createIndexStatement,
@ -467,6 +484,7 @@ extern void ErrorIfUnsupportedRenameStmt(RenameStmt *renameStmt);
extern List * PreprocessRenameAttributeStmt(Node *stmt, const char *queryString, extern List * PreprocessRenameAttributeStmt(Node *stmt, const char *queryString,
ProcessUtilityContext processUtilityContext); ProcessUtilityContext processUtilityContext);
/* role.c - forward declarations*/ /* role.c - forward declarations*/
extern List * PostprocessAlterRoleStmt(Node *stmt, const char *queryString); extern List * PostprocessAlterRoleStmt(Node *stmt, const char *queryString);
extern List * PreprocessAlterRoleSetStmt(Node *stmt, const char *queryString, extern List * PreprocessAlterRoleSetStmt(Node *stmt, const char *queryString,
@ -589,6 +607,7 @@ extern List * GetAlterIndexStatisticsCommands(Oid indexOid);
/* subscription.c - forward declarations */ /* subscription.c - forward declarations */
extern Node * ProcessCreateSubscriptionStmt(CreateSubscriptionStmt *createSubStmt); extern Node * ProcessCreateSubscriptionStmt(CreateSubscriptionStmt *createSubStmt);
/* table.c - forward declarations */ /* table.c - forward declarations */
extern List * PreprocessDropTableStmt(Node *stmt, const char *queryString, extern List * PreprocessDropTableStmt(Node *stmt, const char *queryString,
ProcessUtilityContext processUtilityContext); ProcessUtilityContext processUtilityContext);

View File

@ -127,9 +127,18 @@ struct option_format
{ {
const char *name; const char *name;
const char *format; const char *format;
const char *type; const int type;
}; };
typedef enum OptionFormatType
{
OPTION_FORMAT_STRING,
OPTION_FORMAT_LITERAL_CSTR,
OPTION_FORMAT_BOOLEAN,
OPTION_FORMAT_INTEGER,
OPTION_FORMAT_OBJECT_ID
} OptionFormatType;
extern void optionToStatement(StringInfo buf, DefElem *option, const struct extern void optionToStatement(StringInfo buf, DefElem *option, const struct
option_format *opt_formats, int option_format *opt_formats, int

View File

@ -329,7 +329,7 @@ typedef struct Task
/* /*
* Vacuum, create/drop/reindex concurrently cannot be executed in a transaction. * Vacuum, create/drop/reindex concurrently cannot be executed in a transaction.
*/ */
bool cannotBeExecutedInTransaction; bool cannotBeExecutedInTransction;
Const *partitionKeyValue; Const *partitionKeyValue;
int colocationId; int colocationId;

View File

@ -102,6 +102,22 @@ WHERE datname = 'mydatabase';
(0 rows) (0 rows)
\c - - - :master_port \c - - - :master_port
--tests for special characters in database name
set citus.enable_create_database_propagation=on;
SET citus.log_remote_commands = true;
set citus.grep_remote_commands = '%CREATE DATABASE%';
create database "mydatabase#1'2";
NOTICE: issuing SELECT pg_catalog.citus_internal_database_command('CREATE DATABASE "mydatabase#1''2"')
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing SELECT pg_catalog.citus_internal_database_command('CREATE DATABASE "mydatabase#1''2"')
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
set citus.grep_remote_commands = '%DROP DATABASE%';
drop database if exists "mydatabase#1'2";
NOTICE: issuing SELECT pg_catalog.citus_internal_database_command('DROP DATABASE IF EXISTS "mydatabase#1''2"')
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
NOTICE: issuing SELECT pg_catalog.citus_internal_database_command('DROP DATABASE IF EXISTS "mydatabase#1''2"')
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
--clean up resources created by this test
drop tablespace create_drop_db_tablespace; drop tablespace create_drop_db_tablespace;
\c - - - :worker_1_port \c - - - :worker_1_port
drop tablespace create_drop_db_tablespace; drop tablespace create_drop_db_tablespace;

View File

@ -94,6 +94,18 @@ WHERE datname = 'mydatabase';
\c - - - :master_port \c - - - :master_port
--tests for special characters in database name
set citus.enable_create_database_propagation=on;
SET citus.log_remote_commands = true;
set citus.grep_remote_commands = '%CREATE DATABASE%';
create database "mydatabase#1'2";
set citus.grep_remote_commands = '%DROP DATABASE%';
drop database if exists "mydatabase#1'2";
--clean up resources created by this test
drop tablespace create_drop_db_tablespace; drop tablespace create_drop_db_tablespace;
\c - - - :worker_1_port \c - - - :worker_1_port