From 1f29c16262d7c5c0b5f5c8ad33096f6610fc67b4 Mon Sep 17 00:00:00 2001 From: Hanefi Onaldi Date: Thu, 1 Dec 2022 15:43:02 +0300 Subject: [PATCH 1/9] Fix misleading GUC description (#6532) citus.skip_advisory_lock_permission_checks skips checks when it is set to 'on', not 'off' --- src/backend/distributed/shared_library_init.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/backend/distributed/shared_library_init.c b/src/backend/distributed/shared_library_init.c index 81ad43f38..49eaf3063 100644 --- a/src/backend/distributed/shared_library_init.c +++ b/src/backend/distributed/shared_library_init.c @@ -2178,7 +2178,7 @@ RegisterCitusConfigVariables(void) "citus.skip_advisory_lock_permission_checks", gettext_noop("Postgres would normally enforce some " "ownership checks while acquiring locks. " - "When this setting is 'off', Citus skips" + "When this setting is 'on', Citus skips" "ownership checks on internal advisory " "locks."), NULL, From c2193608c90fb02b023795b32454067174809f4a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?G=C3=BCrkan=20=C4=B0ndibay?= Date: Thu, 1 Dec 2022 19:11:41 +0300 Subject: [PATCH 2/9] Add jobs to test builds on different distros (#6499) With this PR, citus code will be tested in all packaging environments. Sometimes, there can be compile errors which blocks packaging and in this case unplanned delays may occur. By testing the code in packaging environments, I'm aiming to detect any compilation errors before packaging. Co-authored-by: Onur Tirtir Co-authored-by: Hanefi Onaldi --- .github/packaging/packaging_ignore.yml | 3 + .github/packaging/validate_build_output.sh | 6 + .../workflows/packaging-test-pipelines.yml | 158 ++++++++++++++++++ src/bin/pg_send_cancellation/Makefile | 6 +- 4 files changed, 172 insertions(+), 1 deletion(-) create mode 100644 .github/packaging/packaging_ignore.yml create mode 100755 .github/packaging/validate_build_output.sh create mode 100644 .github/workflows/packaging-test-pipelines.yml diff --git a/.github/packaging/packaging_ignore.yml b/.github/packaging/packaging_ignore.yml new file mode 100644 index 000000000..c46f28f5e --- /dev/null +++ b/.github/packaging/packaging_ignore.yml @@ -0,0 +1,3 @@ +base: + - ".* warning: ignoring old recipe for target [`']check'" + - ".* warning: overriding recipe for target [`']check'" diff --git a/.github/packaging/validate_build_output.sh b/.github/packaging/validate_build_output.sh new file mode 100755 index 000000000..4ae588574 --- /dev/null +++ b/.github/packaging/validate_build_output.sh @@ -0,0 +1,6 @@ +package_type=${1} +git clone -b v0.8.23 --depth=1 https://github.com/citusdata/tools.git tools +python3 -m pip install -r tools/packaging_automation/requirements.txt +python3 -m tools.packaging_automation.validate_build_output --output_file output.log \ + --ignore_file .github/packaging/packaging_ignore.yml \ + --package_type ${package_type} diff --git a/.github/workflows/packaging-test-pipelines.yml b/.github/workflows/packaging-test-pipelines.yml new file mode 100644 index 000000000..f79619b21 --- /dev/null +++ b/.github/workflows/packaging-test-pipelines.yml @@ -0,0 +1,158 @@ +name: Build tests in packaging images + +on: + push: + branches: "**" + + workflow_dispatch: + +jobs: + + get_postgres_versions_from_file: + runs-on: ubuntu-latest + outputs: + pg_versions: ${{ steps.get-postgres-versions.outputs.pg_versions }} + steps: + - name: Checkout + uses: actions/checkout@v3 + with: + fetch-depth: 2 + - name: Get Postgres Versions + id: get-postgres-versions + run: | + # Postgres versions are stored in .circleci/config.yml file in "build-[pg-version] format. Below command + # extracts the versions and get the unique values. + pg_versions=`grep -Eo 'build-[[:digit:]]{2}' .circleci/config.yml|sed -e "s/^build-//"|sort|uniq|tr '\n' ','| head -c -1` + pg_versions_array="[ ${pg_versions} ]" + echo "Supported PG Versions: ${pg_versions_array}" + # Below line is needed to set the output variable to be used in the next job + echo "pg_versions=${pg_versions_array}" >> $GITHUB_OUTPUT + + rpm_build_tests: + name: rpm_build_tests + needs: get_postgres_versions_from_file + runs-on: ubuntu-latest + strategy: + fail-fast: false + matrix: + # While we use separate images for different Postgres versions in rpm + # based distros + # For this reason, we need to use a "matrix" to generate names of + # rpm images, e.g. citus/packaging:centos-7-pg12 + packaging_docker_image: + - oraclelinux-7 + - oraclelinux-8 + - centos-7 + - centos-8 + - almalinux-9 + POSTGRES_VERSION: ${{ fromJson(needs.get_postgres_versions_from_file.outputs.pg_versions) }} + + container: + image: citus/packaging:${{ matrix.packaging_docker_image }}-pg${{ matrix.POSTGRES_VERSION }} + + steps: + - name: Checkout repository + uses: actions/checkout@v3 + + - name: Add Postgres installation directory into PATH for rpm based distros + run: | + echo "/usr/pgsql-${{ matrix.POSTGRES_VERSION }}/bin" >> $GITHUB_PATH + + - name: Configure + run: | + echo "Current Shell:$0" + echo "GCC Version: $(gcc --version)" + ./configure 2>&1 | tee output.log + + - name: Make clean + run: | + make clean + + - name: Make + run: | + make CFLAGS="-Wno-missing-braces" -sj$(cat /proc/cpuinfo | grep "core id" | wc -l) 2>&1 | tee -a output.log + + - name: Make install + run: | + make CFLAGS="-Wno-missing-braces" install 2>&1 | tee -a output.log + + - name: Validate output + env: + POSTGRES_VERSION: ${{ matrix.POSTGRES_VERSION }} + PACKAGING_DOCKER_IMAGE: ${{ matrix.packaging_docker_image }} + run: | + echo "Postgres version: ${POSTGRES_VERSION}" + + ## Install required packages to execute packaging tools for rpm based distros + yum install python3-pip python3-devel postgresql-devel -y + python3 -m pip install wheel + + ./.github/packaging/validate_build_output.sh "rpm" + + deb_build_tests: + name: deb_build_tests + needs: get_postgres_versions_from_file + runs-on: ubuntu-latest + strategy: + fail-fast: false + matrix: + # On deb based distros, we use the same docker image for + # builds based on different Postgres versions because deb + # based images include all postgres installations. + # For this reason, we have multiple runs --which is 3 today-- + # for each deb based image and we use POSTGRES_VERSION to set + # PG_CONFIG variable in each of those runs. + packaging_docker_image: + - debian-buster-all + - debian-bookworm-all + - debian-bullseye-all + - ubuntu-bionic-all + - ubuntu-focal-all + - ubuntu-jammy-all + - ubuntu-kinetic-all + + POSTGRES_VERSION: ${{ fromJson(needs.get_postgres_versions_from_file.outputs.pg_versions) }} + + container: + image: citus/packaging:${{ matrix.packaging_docker_image }} + + steps: + - name: Checkout repository + uses: actions/checkout@v3 + + - name: Set pg_config path to related Postgres version + run: | + echo "PG_CONFIG=/usr/lib/postgresql/${{ matrix.POSTGRES_VERSION }}/bin/pg_config" >> $GITHUB_ENV + + - name: Configure + run: | + echo "Current Shell:$0" + echo "GCC Version: $(gcc --version)" + ./configure 2>&1 | tee output.log + + - name: Make clean + run: | + make clean + + - name: Make + run: | + make -sj$(cat /proc/cpuinfo | grep "core id" | wc -l) 2>&1 | tee -a output.log + + - name: Make install + run: | + make install 2>&1 | tee -a output.log + + - name: Validate output + env: + POSTGRES_VERSION: ${{ matrix.POSTGRES_VERSION }} + PACKAGING_DOCKER_IMAGE: ${{ matrix.packaging_docker_image }} + run: | + echo "Postgres version: ${POSTGRES_VERSION}" + + apt-get update -y + ## Install required packages to execute packaging tools for deb based distros + apt install python3-dev python3-pip -y + sudo apt-get purge -y python3-yaml + python3 -m pip install --upgrade pip setuptools==57.5.0 + + ./.github/packaging/validate_build_output.sh "deb" diff --git a/src/bin/pg_send_cancellation/Makefile b/src/bin/pg_send_cancellation/Makefile index 7cf76757f..4515c5019 100644 --- a/src/bin/pg_send_cancellation/Makefile +++ b/src/bin/pg_send_cancellation/Makefile @@ -10,7 +10,11 @@ PG_LDFLAGS += $(LDFLAGS) include $(citus_top_builddir)/Makefile.global # We reuse all the Citus flags (incl. security flags), but we are building a program not a shared library -override CFLAGS := $(filter-out -shared,$(CFLAGS)) +# We sometimes build Citus with a newer version of gcc than Postgres was built +# with and this breaks LTO (link-time optimization). Even if disabling it can +# have some perf impact this is ok because pg_send_cancellation is only used +# for tests anyway. +override CFLAGS := $(filter-out -shared, $(CFLAGS)) -fno-lto # Filter out unneeded dependencies override LIBS := $(filter-out -lz -lreadline -ledit -ltermcap -lncurses -lcurses -lpam, $(LIBS)) From 29f0196fdfe1c80b3e37e9ed5fe2bf7534401e6c Mon Sep 17 00:00:00 2001 From: songjinzhou <49380232+TsinghuaLucky912@users.noreply.github.com> Date: Fri, 2 Dec 2022 00:45:32 +0800 Subject: [PATCH 3/9] Add support for SET ACCESS METHOD in altering a distributed table (#6525) Co-authored-by: TsinghuaLucky912 --- src/backend/distributed/commands/table.c | 4 ++++ src/test/regress/expected/pg15.out | 30 ++++++++++++++++++++++++ src/test/regress/sql/pg15.sql | 15 ++++++++++++ 3 files changed, 49 insertions(+) diff --git a/src/backend/distributed/commands/table.c b/src/backend/distributed/commands/table.c index 1be61cbe2..8101dc964 100644 --- a/src/backend/distributed/commands/table.c +++ b/src/backend/distributed/commands/table.c @@ -2992,6 +2992,9 @@ ErrorIfUnsupportedAlterTableStmt(AlterTableStmt *alterTableStatement) break; } +#if PG_VERSION_NUM >= PG_VERSION_15 + case AT_SetAccessMethod: +#endif case AT_SetNotNull: case AT_ReplicaIdentity: case AT_ChangeOwner: @@ -3007,6 +3010,7 @@ ErrorIfUnsupportedAlterTableStmt(AlterTableStmt *alterTableStatement) { /* * We will not perform any special check for: + * ALTER TABLE .. SET ACCESS METHOD .. * ALTER TABLE .. ALTER COLUMN .. SET NOT NULL * ALTER TABLE .. REPLICA IDENTITY .. * ALTER TABLE .. VALIDATE CONSTRAINT .. diff --git a/src/test/regress/expected/pg15.out b/src/test/regress/expected/pg15.out index be14f70a7..5dcd2c04d 100644 --- a/src/test/regress/expected/pg15.out +++ b/src/test/regress/expected/pg15.out @@ -1456,6 +1456,36 @@ NOTICE: drop cascades to 2 other objects CREATE DATABASE db_with_oid OID 987654; NOTICE: Citus partially supports CREATE DATABASE for distributed databases DROP DATABASE db_with_oid; +-- SET ACCESS METHOD +-- Create a heap2 table am handler with heapam handler +CREATE ACCESS METHOD heap2 TYPE TABLE HANDLER heap_tableam_handler; +SELECT run_command_on_workers($$CREATE ACCESS METHOD heap2 TYPE TABLE HANDLER heap_tableam_handler$$); + run_command_on_workers +--------------------------------------------------------------------- + (localhost,57637,t,"CREATE ACCESS METHOD") + (localhost,57638,t,"CREATE ACCESS METHOD") +(2 rows) + +CREATE TABLE mx_ddl_table2 ( + key int primary key, + value int +); +SELECT create_distributed_table('mx_ddl_table2', 'key', 'hash', shard_count=> 4); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +ALTER TABLE mx_ddl_table2 SET ACCESS METHOD heap2; +DROP TABLE mx_ddl_table2; +DROP ACCESS METHOD heap2; +SELECT run_command_on_workers($$DROP ACCESS METHOD heap2$$); + run_command_on_workers +--------------------------------------------------------------------- + (localhost,57637,t,"DROP ACCESS METHOD") + (localhost,57638,t,"DROP ACCESS METHOD") +(2 rows) + -- Clean up \set VERBOSITY terse SET client_min_messages TO ERROR; diff --git a/src/test/regress/sql/pg15.sql b/src/test/regress/sql/pg15.sql index 04dc46910..c334671bc 100644 --- a/src/test/regress/sql/pg15.sql +++ b/src/test/regress/sql/pg15.sql @@ -924,6 +924,21 @@ DROP SERVER foreign_server CASCADE; CREATE DATABASE db_with_oid OID 987654; DROP DATABASE db_with_oid; +-- SET ACCESS METHOD +-- Create a heap2 table am handler with heapam handler +CREATE ACCESS METHOD heap2 TYPE TABLE HANDLER heap_tableam_handler; +SELECT run_command_on_workers($$CREATE ACCESS METHOD heap2 TYPE TABLE HANDLER heap_tableam_handler$$); +CREATE TABLE mx_ddl_table2 ( + key int primary key, + value int +); +SELECT create_distributed_table('mx_ddl_table2', 'key', 'hash', shard_count=> 4); +ALTER TABLE mx_ddl_table2 SET ACCESS METHOD heap2; + +DROP TABLE mx_ddl_table2; +DROP ACCESS METHOD heap2; +SELECT run_command_on_workers($$DROP ACCESS METHOD heap2$$); + -- Clean up \set VERBOSITY terse SET client_min_messages TO ERROR; From 37f3dff1ca169167471cb7cb1415baa70b296ebf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fabr=C3=ADzio=20de=20Royes=20Mello?= Date: Thu, 1 Dec 2022 16:05:40 -0300 Subject: [PATCH 4/9] Simplify columnar perf example (#6526) Rewrite the plpython function to generate random words in SQL to simplify the usage and run the example. --- src/backend/columnar/README.md | 18 ++++++++---------- 1 file changed, 8 insertions(+), 10 deletions(-) diff --git a/src/backend/columnar/README.md b/src/backend/columnar/README.md index e8681e0f3..ca3532dba 100644 --- a/src/backend/columnar/README.md +++ b/src/backend/columnar/README.md @@ -234,16 +234,14 @@ CREATE TABLE perf_columnar(LIKE perf_row) USING COLUMNAR; ## Data ```sql -CREATE OR REPLACE FUNCTION random_words(n INT4) RETURNS TEXT LANGUAGE plpython2u AS $$ -import random -t = '' -words = ['zero','one','two','three','four','five','six','seven','eight','nine','ten'] -for i in xrange(0,n): - if (i != 0): - t += ' ' - r = random.randint(0,len(words)-1) - t += words[r] -return t +CREATE OR REPLACE FUNCTION random_words(n INT4) RETURNS TEXT LANGUAGE sql AS $$ + WITH words(w) AS ( + SELECT ARRAY['zero','one','two','three','four','five','six','seven','eight','nine','ten'] + ), + random (word) AS ( + SELECT w[(random()*array_length(w, 1))::int] FROM generate_series(1, $1) AS i, words + ) + SELECT string_agg(word, ' ') FROM random; $$; ``` From d4394b2e2db3a64ff34cf88678e1fbb3051428b1 Mon Sep 17 00:00:00 2001 From: Hanefi Onaldi Date: Thu, 1 Dec 2022 23:42:47 +0300 Subject: [PATCH 5/9] Fix spacing in multiline strings (#6533) When using multiline strings, we occasionally forget to add a single space at the end of the first line. When this line is concatenated with the next one, the resulting string has a missing space. --- .../distributed/metadata/node_metadata.c | 2 +- .../distributed/operations/shard_transfer.c | 2 +- .../replication/multi_logical_replication.c | 2 +- src/backend/distributed/shared_library_init.c | 30 +++++++++---------- .../distributed/utils/background_jobs.c | 2 +- 5 files changed, 19 insertions(+), 19 deletions(-) diff --git a/src/backend/distributed/metadata/node_metadata.c b/src/backend/distributed/metadata/node_metadata.c index edd72e71f..d133c50b2 100644 --- a/src/backend/distributed/metadata/node_metadata.c +++ b/src/backend/distributed/metadata/node_metadata.c @@ -2232,7 +2232,7 @@ SetWorkerColumnOptional(WorkerNode *workerNode, int columnIndex, Datum value) { /* metadata out of sync, mark the worker as not synced */ ereport(WARNING, (errmsg("Updating the metadata of the node %s:%d " - "is failed on node %s:%d." + "is failed on node %s:%d. " "Metadata on %s:%d is marked as out of sync.", workerNode->workerName, workerNode->workerPort, worker->workerName, worker->workerPort, diff --git a/src/backend/distributed/operations/shard_transfer.c b/src/backend/distributed/operations/shard_transfer.c index 9e777f2a1..4c20c0433 100644 --- a/src/backend/distributed/operations/shard_transfer.c +++ b/src/backend/distributed/operations/shard_transfer.c @@ -553,7 +553,7 @@ CheckSpaceConstraints(MultiConnection *connection, uint64 colocationSizeInBytes) { ereport(ERROR, (errmsg("not enough empty space on node if the shard is moved, " "actual available space after move will be %ld bytes, " - "desired available space after move is %ld bytes," + "desired available space after move is %ld bytes, " "estimated size increase on node after move is %ld bytes.", diskAvailableInBytesAfterShardMove, desiredNewDiskAvailableInBytes, colocationSizeInBytes), diff --git a/src/backend/distributed/replication/multi_logical_replication.c b/src/backend/distributed/replication/multi_logical_replication.c index 051c88735..0157bb545 100644 --- a/src/backend/distributed/replication/multi_logical_replication.c +++ b/src/backend/distributed/replication/multi_logical_replication.c @@ -604,7 +604,7 @@ CreateReplicaIdentitiesOnNode(List *shardList, char *nodeName, int32 nodePort) if (commandList != NIL) { - ereport(DEBUG1, (errmsg("Creating replica identity for shard %ld on" + ereport(DEBUG1, (errmsg("Creating replica identity for shard %ld on " "target node %s:%d", shardId, nodeName, nodePort))); SendCommandListToWorkerOutsideTransaction(nodeName, nodePort, diff --git a/src/backend/distributed/shared_library_init.c b/src/backend/distributed/shared_library_init.c index 49eaf3063..c15268056 100644 --- a/src/backend/distributed/shared_library_init.c +++ b/src/backend/distributed/shared_library_init.c @@ -1054,8 +1054,8 @@ RegisterCitusConfigVariables(void) gettext_noop( "Sets how many percentage of free disk space should be after a shard move"), gettext_noop( - "This setting controls how much free space should be available after a shard move." - "If the free disk space will be lower than this parameter, then shard move will result in" + "This setting controls how much free space should be available after a shard move. " + "If the free disk space will be lower than this parameter, then shard move will result in " "an error."), &DesiredPercentFreeAfterMove, 10.0, 0.0, 100.0, @@ -1448,7 +1448,7 @@ RegisterCitusConfigVariables(void) "parallelization"), gettext_noop("When enabled, Citus will force the executor to use " "as many connections as possible while executing a " - "parallel distributed query. If not enabled, the executor" + "parallel distributed query. If not enabled, the executor " "might choose to use less connections to optimize overall " "query execution throughput. Internally, setting this true " "will end up with using one connection per task."), @@ -1487,7 +1487,7 @@ RegisterCitusConfigVariables(void) DefineCustomBoolVariable( "citus.hide_citus_dependent_objects", gettext_noop( - "Hides some objects, which depends on citus extension, from pg meta class queries." + "Hides some objects, which depends on citus extension, from pg meta class queries. " "It is intended to be used only before postgres vanilla tests to not break them."), NULL, &HideCitusDependentObjects, @@ -1594,10 +1594,10 @@ RegisterCitusConfigVariables(void) gettext_noop("defines the behaviour when a distributed table " "is joined with a local table"), gettext_noop( - "There are 4 values available. The default, 'auto' will recursively plan" - "distributed tables if there is a constant filter on a unique index." - "'prefer-local' will choose local tables if possible." - "'prefer-distributed' will choose distributed tables if possible" + "There are 4 values available. The default, 'auto' will recursively plan " + "distributed tables if there is a constant filter on a unique index. " + "'prefer-local' will choose local tables if possible. " + "'prefer-distributed' will choose distributed tables if possible. " "'never' will basically skip local table joins." ), &LocalTableJoinPolicy, @@ -1816,11 +1816,11 @@ RegisterCitusConfigVariables(void) "health status are tracked in a shared hash table on " "the master node. This configuration value limits the " "size of the hash table, and consequently the maximum " - "number of worker nodes that can be tracked." + "number of worker nodes that can be tracked. " "Citus keeps some information about the worker nodes " "in the shared memory for certain optimizations. The " "optimizations are enforced up to this number of worker " - "nodes. Any additional worker nodes may not benefit from" + "nodes. Any additional worker nodes may not benefit from " "the optimizations."), &MaxWorkerNodesTracked, 2048, 1024, INT_MAX, @@ -1994,7 +1994,7 @@ RegisterCitusConfigVariables(void) gettext_noop("When enabled, the executor waits until all the connections " "are successfully established."), gettext_noop("Under some load, the executor may decide to establish some " - "extra connections to further parallelize the execution. However," + "extra connections to further parallelize the execution. However, " "before the connection establishment is done, the execution might " "have already finished. When this GUC is set to true, the execution " "waits for such connections to be established."), @@ -2092,7 +2092,7 @@ RegisterCitusConfigVariables(void) "citus.replication_model", gettext_noop("Deprecated. Please use citus.shard_replication_factor instead"), gettext_noop( - "Shard replication model is determined by the shard replication factor." + "Shard replication model is determined by the shard replication factor. " "'statement' replication is used only when the replication factor is one."), &ReplicationModel, REPLICATION_MODEL_STREAMING, @@ -2178,7 +2178,7 @@ RegisterCitusConfigVariables(void) "citus.skip_advisory_lock_permission_checks", gettext_noop("Postgres would normally enforce some " "ownership checks while acquiring locks. " - "When this setting is 'on', Citus skips" + "When this setting is 'on', Citus skips " "ownership checks on internal advisory " "locks."), NULL, @@ -2225,7 +2225,7 @@ RegisterCitusConfigVariables(void) gettext_noop("This feature is not intended for users. It is developed " "to get consistent regression test outputs. When enabled, " "the RETURNING clause returns the tuples sorted. The sort " - "is done for all the entries, starting from the first one." + "is done for all the entries, starting from the first one. " "Finally, the sorting is done in ASC order."), &SortReturning, false, @@ -2283,7 +2283,7 @@ RegisterCitusConfigVariables(void) "It means that the queries are likely to return wrong results " "unless the user is absolutely sure that pushing down the " "subquery is safe. This GUC is maintained only for backward " - "compatibility, no new users are supposed to use it. The planner" + "compatibility, no new users are supposed to use it. The planner " "is capable of pushing down as much computation as possible to the " "shards depending on the query."), &SubqueryPushdown, diff --git a/src/backend/distributed/utils/background_jobs.c b/src/backend/distributed/utils/background_jobs.c index 45f7a842f..2032b7e65 100644 --- a/src/backend/distributed/utils/background_jobs.c +++ b/src/backend/distributed/utils/background_jobs.c @@ -543,7 +543,7 @@ CheckAndResetLastWorkerAllocationFailure( GetCurrentTimestamp(), &secs, µsecs); ereport(LOG, (errmsg( - "able to start a background worker with %ld seconds" + "able to start a background worker with %ld seconds " "delay", secs))); queueMonitorExecutionContext->backgroundWorkerFailedStartTime = 0; From 3b24c47470e9c137a3306ace5ef31e073ae5f8fa Mon Sep 17 00:00:00 2001 From: Ahmet Gedemenli Date: Fri, 2 Dec 2022 12:39:36 +0300 Subject: [PATCH 6/9] Fix flaky cleanup tests (#6530) We are having some flakiness in our test schedule because of the objects leftover from shard moves/splits. With this commit we prevent logging cleanup object counts. fixes: #6534 --- .../citus_non_blocking_split_columnar.out | 3 +- ...citus_non_blocking_split_shard_cleanup.out | 5 +++ .../citus_non_blocking_split_shards.out | 33 +++++++++++++-- .../failure_online_move_shard_placement.out | 3 +- .../expected/failure_split_cleanup.out | 40 +++++++++---------- .../sql/citus_non_blocking_split_columnar.sql | 2 + ...citus_non_blocking_split_shard_cleanup.sql | 5 +++ .../sql/citus_non_blocking_split_shards.sql | 13 ++++++ .../failure_online_move_shard_placement.sql | 2 + .../regress/sql/failure_split_cleanup.sql | 24 +++++------ 10 files changed, 93 insertions(+), 37 deletions(-) diff --git a/src/test/regress/expected/citus_non_blocking_split_columnar.out b/src/test/regress/expected/citus_non_blocking_split_columnar.out index 3098b98c1..d1ce4d6a7 100644 --- a/src/test/regress/expected/citus_non_blocking_split_columnar.out +++ b/src/test/regress/expected/citus_non_blocking_split_columnar.out @@ -525,8 +525,9 @@ NOTICE: cleaned up 11 orphaned resources -- END: Split a partition table directly -- BEGIN: Perform deferred cleanup. +SET client_min_messages TO WARNING; CALL pg_catalog.citus_cleanup_orphaned_resources(); -NOTICE: cleaned up 11 orphaned resources +RESET client_min_messages; -- END: Perform deferred cleanup. -- BEGIN: Validate Shard Info and Data SELECT shard.shardid, logicalrelid, shardminvalue, shardmaxvalue, nodename, nodeport diff --git a/src/test/regress/expected/citus_non_blocking_split_shard_cleanup.out b/src/test/regress/expected/citus_non_blocking_split_shard_cleanup.out index bf6467779..d7a1bc47d 100644 --- a/src/test/regress/expected/citus_non_blocking_split_shard_cleanup.out +++ b/src/test/regress/expected/citus_non_blocking_split_shard_cleanup.out @@ -47,6 +47,11 @@ SELECT pg_catalog.citus_split_shard_by_split_points( (1 row) +-- BEGIN: Perform deferred cleanup. +SET client_min_messages TO WARNING; +CALL pg_catalog.citus_cleanup_orphaned_resources(); +RESET client_min_messages; +-- END: Perform deferred cleanup. \c - - - :worker_1_port SET search_path TO "citus_split_test_schema"; SET citus.show_shards_for_app_name_prefixes = '*'; diff --git a/src/test/regress/expected/citus_non_blocking_split_shards.out b/src/test/regress/expected/citus_non_blocking_split_shards.out index ad0afd3e8..f08f9c428 100644 --- a/src/test/regress/expected/citus_non_blocking_split_shards.out +++ b/src/test/regress/expected/citus_non_blocking_split_shards.out @@ -237,8 +237,9 @@ SELECT pg_catalog.citus_split_shard_by_split_points( (1 row) -- BEGIN: Perform deferred cleanup. +SET client_min_messages TO WARNING; CALL pg_catalog.citus_cleanup_orphaned_resources(); -NOTICE: cleaned up 3 orphaned resources +RESET client_min_messages; -- END: Perform deferred cleanup. -- Perform 3 way split SELECT pg_catalog.citus_split_shard_by_split_points( @@ -253,8 +254,9 @@ SELECT pg_catalog.citus_split_shard_by_split_points( -- END : Split two shards : One with move and One without move. -- BEGIN: Perform deferred cleanup. +SET client_min_messages TO WARNING; CALL pg_catalog.citus_cleanup_orphaned_resources(); -NOTICE: cleaned up 3 orphaned resources +RESET client_min_messages; -- END: Perform deferred cleanup. -- BEGIN : Move a shard post split. SELECT citus_move_shard_placement(8981007, 'localhost', :worker_1_port, 'localhost', :worker_2_port, shard_transfer_mode:='block_writes'); @@ -474,7 +476,9 @@ ERROR: cannot use logical replication to transfer shards of the relation table_ DETAIL: UPDATE and DELETE commands on the shard will error out during logical replication unless there is a REPLICA IDENTITY or PRIMARY KEY. HINT: If you wish to continue without a replica identity set the shard_transfer_mode to 'force_logical' or 'block_writes'. -- BEGIN: Perform deferred cleanup. +SET client_min_messages TO WARNING; CALL pg_catalog.citus_cleanup_orphaned_resources(); +RESET client_min_messages; -- END: Perform deferred cleanup. SELECT shard.shardid, logicalrelid, shardminvalue, shardmaxvalue, nodename, nodeport FROM pg_dist_shard AS shard @@ -522,8 +526,9 @@ SELECT pg_catalog.citus_split_shard_by_split_points( (1 row) -- BEGIN: Perform deferred cleanup. +SET client_min_messages TO WARNING; CALL pg_catalog.citus_cleanup_orphaned_resources(); -NOTICE: cleaned up 3 orphaned resources +RESET client_min_messages; -- END: Perform deferred cleanup. SELECT shard.shardid, logicalrelid, shardminvalue, shardmaxvalue, nodename, nodeport FROM pg_dist_shard AS shard @@ -580,6 +585,28 @@ SELECT COUNT(*) FROM colocated_dist_table; -- END: Validate Data Count --BEGIN : Cleanup \c - postgres - :master_port +-- make sure we don't have any replication objects leftover on the workers +SELECT run_command_on_workers($$SELECT count(*) FROM pg_replication_slots$$); + run_command_on_workers +--------------------------------------------------------------------- + (localhost,57637,t,0) + (localhost,57638,t,0) +(2 rows) + +SELECT run_command_on_workers($$SELECT count(*) FROM pg_publication$$); + run_command_on_workers +--------------------------------------------------------------------- + (localhost,57637,t,0) + (localhost,57638,t,0) +(2 rows) + +SELECT run_command_on_workers($$SELECT count(*) FROM pg_subscription$$); + run_command_on_workers +--------------------------------------------------------------------- + (localhost,57637,t,0) + (localhost,57638,t,0) +(2 rows) + ALTER SYSTEM RESET citus.defer_shard_delete_interval; SELECT pg_reload_conf(); pg_reload_conf diff --git a/src/test/regress/expected/failure_online_move_shard_placement.out b/src/test/regress/expected/failure_online_move_shard_placement.out index 15684a3b1..71a3b2527 100644 --- a/src/test/regress/expected/failure_online_move_shard_placement.out +++ b/src/test/regress/expected/failure_online_move_shard_placement.out @@ -325,8 +325,9 @@ SELECT run_command_on_workers($$DROP SUBSCRIPTION IF EXISTS citus_shard_move_sub -- cleanup leftovers -- verify we don't see any error for already dropped subscription +SET client_min_messages TO WARNING; CALL citus_cleanup_orphaned_resources(); -NOTICE: cleaned up 3 orphaned resources +RESET client_min_messages; -- cancellation on dropping subscription SELECT citus.mitmproxy('conn.onQuery(query="^DROP SUBSCRIPTION").cancel(' || :pid || ')'); mitmproxy diff --git a/src/test/regress/expected/failure_split_cleanup.out b/src/test/regress/expected/failure_split_cleanup.out index 5765d3e4f..ce06f4978 100644 --- a/src/test/regress/expected/failure_split_cleanup.out +++ b/src/test/regress/expected/failure_split_cleanup.out @@ -52,11 +52,11 @@ WARNING: failed to clean up 2 orphaned shards out of 5 after a citus_split_shar ERROR: connection not open CONTEXT: while executing command on localhost:xxxxx SELECT operation_id, object_type, object_name, node_group_id, policy_type - FROM pg_dist_cleanup where operation_id = 777; + FROM pg_dist_cleanup where operation_id = 777 ORDER BY object_name; operation_id | object_type | object_name | node_group_id | policy_type --------------------------------------------------------------------- - 777 | 1 | citus_failure_split_cleanup_schema.table_to_split_8981003 | 2 | 1 777 | 1 | citus_failure_split_cleanup_schema.table_to_split_8981002 | 2 | 0 + 777 | 1 | citus_failure_split_cleanup_schema.table_to_split_8981003 | 2 | 1 777 | 4 | citus_shard_split_publication_xxxxxxx_xxxxxxx | 2 | 0 (3 rows) @@ -101,7 +101,7 @@ CONTEXT: while executing command on localhost:xxxxx CALL pg_catalog.citus_cleanup_orphaned_resources(); NOTICE: cleaned up 3 orphaned resources SELECT operation_id, object_type, object_name, node_group_id, policy_type - FROM pg_dist_cleanup where operation_id = 777; + FROM pg_dist_cleanup where operation_id = 777 ORDER BY object_name; operation_id | object_type | object_name | node_group_id | policy_type --------------------------------------------------------------------- (0 rows) @@ -155,11 +155,11 @@ NOTICE: cleaned up 3 orphaned resources ERROR: Failed to run worker_split_shard_replication_setup UDF. It should successfully execute for splitting a shard in a non-blocking way. Please retry. RESET client_min_messages; SELECT operation_id, object_type, object_name, node_group_id, policy_type - FROM pg_dist_cleanup where operation_id = 777; + FROM pg_dist_cleanup where operation_id = 777 ORDER BY object_name; operation_id | object_type | object_name | node_group_id | policy_type --------------------------------------------------------------------- - 777 | 1 | citus_failure_split_cleanup_schema.table_to_split_8981003 | 2 | 1 777 | 1 | citus_failure_split_cleanup_schema.table_to_split_8981002 | 2 | 0 + 777 | 1 | citus_failure_split_cleanup_schema.table_to_split_8981003 | 2 | 1 777 | 4 | citus_shard_split_publication_xxxxxxx_xxxxxxx | 2 | 0 777 | 4 | citus_shard_split_publication_xxxxxxx_xxxxxxx | 2 | 0 (4 rows) @@ -207,7 +207,7 @@ ERROR: Failed to run worker_split_shard_replication_setup UDF. It should succes CALL pg_catalog.citus_cleanup_orphaned_resources(); NOTICE: cleaned up 4 orphaned resources SELECT operation_id, object_type, object_name, node_group_id, policy_type - FROM pg_dist_cleanup where operation_id = 777; + FROM pg_dist_cleanup where operation_id = 777 ORDER BY object_name; operation_id | object_type | object_name | node_group_id | policy_type --------------------------------------------------------------------- (0 rows) @@ -266,11 +266,11 @@ WARNING: failed to clean up 2 orphaned shards out of 7 after a citus_split_shar ERROR: connection not open CONTEXT: while executing command on localhost:xxxxx SELECT operation_id, object_type, object_name, node_group_id, policy_type - FROM pg_dist_cleanup where operation_id = 777; + FROM pg_dist_cleanup where operation_id = 777 ORDER BY object_name; operation_id | object_type | object_name | node_group_id | policy_type --------------------------------------------------------------------- - 777 | 1 | citus_failure_split_cleanup_schema.table_to_split_8981003 | 2 | 1 777 | 1 | citus_failure_split_cleanup_schema.table_to_split_8981002 | 2 | 0 + 777 | 1 | citus_failure_split_cleanup_schema.table_to_split_8981003 | 2 | 1 777 | 4 | citus_shard_split_publication_xxxxxxx_xxxxxxx | 2 | 0 777 | 4 | citus_shard_split_publication_xxxxxxx_xxxxxxx | 2 | 0 777 | 3 | citus_shard_split_slot_xxxxxxx_xxxxxxx | 2 | 0 @@ -319,7 +319,7 @@ CONTEXT: while executing command on localhost:xxxxx CALL pg_catalog.citus_cleanup_orphaned_resources(); NOTICE: cleaned up 5 orphaned resources SELECT operation_id, object_type, object_name, node_group_id, policy_type - FROM pg_dist_cleanup where operation_id = 777; + FROM pg_dist_cleanup where operation_id = 777 ORDER BY object_name; operation_id | object_type | object_name | node_group_id | policy_type --------------------------------------------------------------------- (0 rows) @@ -378,17 +378,17 @@ WARNING: failed to clean up 2 orphaned shards out of 12 after a citus_split_sha ERROR: connection not open CONTEXT: while executing command on localhost:xxxxx SELECT operation_id, object_type, object_name, node_group_id, policy_type - FROM pg_dist_cleanup where operation_id = 777; + FROM pg_dist_cleanup where operation_id = 777 ORDER BY object_name; operation_id | object_type | object_name | node_group_id | policy_type --------------------------------------------------------------------- - 777 | 1 | citus_failure_split_cleanup_schema.table_to_split_8981003 | 2 | 1 777 | 1 | citus_failure_split_cleanup_schema.table_to_split_8981002 | 2 | 0 + 777 | 1 | citus_failure_split_cleanup_schema.table_to_split_8981003 | 2 | 1 777 | 4 | citus_shard_split_publication_xxxxxxx_xxxxxxx | 2 | 0 777 | 4 | citus_shard_split_publication_xxxxxxx_xxxxxxx | 2 | 0 777 | 3 | citus_shard_split_slot_xxxxxxx_xxxxxxx | 2 | 0 777 | 3 | citus_shard_split_slot_xxxxxxx_xxxxxxx | 2 | 0 - 777 | 5 | citus_shard_split_subscription_role_10 | 2 | 0 777 | 2 | citus_shard_split_subscription_xxxxxxx | 2 | 0 + 777 | 5 | citus_shard_split_subscription_role_10 | 2 | 0 (8 rows) -- we need to allow connection so that we can connect to proxy @@ -437,7 +437,7 @@ CONTEXT: while executing command on localhost:xxxxx CALL pg_catalog.citus_cleanup_orphaned_resources(); NOTICE: cleaned up 8 orphaned resources SELECT operation_id, object_type, object_name, node_group_id, policy_type - FROM pg_dist_cleanup where operation_id = 777; + FROM pg_dist_cleanup where operation_id = 777 ORDER BY object_name; operation_id | object_type | object_name | node_group_id | policy_type --------------------------------------------------------------------- (0 rows) @@ -496,17 +496,17 @@ WARNING: failed to clean up 2 orphaned shards out of 12 after a citus_split_sha ERROR: connection not open CONTEXT: while executing command on localhost:xxxxx SELECT operation_id, object_type, object_name, node_group_id, policy_type - FROM pg_dist_cleanup where operation_id = 777; + FROM pg_dist_cleanup where operation_id = 777 ORDER BY object_name; operation_id | object_type | object_name | node_group_id | policy_type --------------------------------------------------------------------- - 777 | 1 | citus_failure_split_cleanup_schema.table_to_split_8981003 | 2 | 1 777 | 1 | citus_failure_split_cleanup_schema.table_to_split_8981002 | 2 | 0 + 777 | 1 | citus_failure_split_cleanup_schema.table_to_split_8981003 | 2 | 1 777 | 4 | citus_shard_split_publication_xxxxxxx_xxxxxxx | 2 | 0 777 | 4 | citus_shard_split_publication_xxxxxxx_xxxxxxx | 2 | 0 777 | 3 | citus_shard_split_slot_xxxxxxx_xxxxxxx | 2 | 0 777 | 3 | citus_shard_split_slot_xxxxxxx_xxxxxxx | 2 | 0 - 777 | 5 | citus_shard_split_subscription_role_10 | 2 | 0 777 | 2 | citus_shard_split_subscription_xxxxxxx | 2 | 0 + 777 | 5 | citus_shard_split_subscription_role_10 | 2 | 0 (8 rows) -- we need to allow connection so that we can connect to proxy @@ -555,7 +555,7 @@ CONTEXT: while executing command on localhost:xxxxx CALL pg_catalog.citus_cleanup_orphaned_resources(); NOTICE: cleaned up 8 orphaned resources SELECT operation_id, object_type, object_name, node_group_id, policy_type - FROM pg_dist_cleanup where operation_id = 777; + FROM pg_dist_cleanup where operation_id = 777 ORDER BY object_name; operation_id | object_type | object_name | node_group_id | policy_type --------------------------------------------------------------------- (0 rows) @@ -615,7 +615,7 @@ WARNING: connection to the remote node localhost:xxxxx failed with the followin ERROR: connection not open CONTEXT: while executing command on localhost:xxxxx SELECT operation_id, object_type, object_name, node_group_id, policy_type - FROM pg_dist_cleanup where operation_id = 777; + FROM pg_dist_cleanup where operation_id = 777 ORDER BY object_name; operation_id | object_type | object_name | node_group_id | policy_type --------------------------------------------------------------------- 777 | 1 | citus_failure_split_cleanup_schema.table_to_split_8981002 | 1 | 1 @@ -624,8 +624,8 @@ CONTEXT: while executing command on localhost:xxxxx 777 | 4 | citus_shard_split_publication_xxxxxxx_xxxxxxx | 2 | 0 777 | 3 | citus_shard_split_slot_xxxxxxx_xxxxxxx | 2 | 0 777 | 3 | citus_shard_split_slot_xxxxxxx_xxxxxxx | 2 | 0 - 777 | 5 | citus_shard_split_subscription_role_10 | 2 | 0 777 | 2 | citus_shard_split_subscription_xxxxxxx | 2 | 0 + 777 | 5 | citus_shard_split_subscription_role_10 | 2 | 0 (8 rows) SELECT relname FROM pg_class where relname LIKE '%table_to_split_%' AND relkind = 'r' order by relname; @@ -678,7 +678,7 @@ CONTEXT: while executing command on localhost:xxxxx CALL pg_catalog.citus_cleanup_orphaned_resources(); NOTICE: cleaned up 8 orphaned resources SELECT operation_id, object_type, object_name, node_group_id, policy_type - FROM pg_dist_cleanup where operation_id = 777; + FROM pg_dist_cleanup where operation_id = 777 ORDER BY object_name; operation_id | object_type | object_name | node_group_id | policy_type --------------------------------------------------------------------- (0 rows) diff --git a/src/test/regress/sql/citus_non_blocking_split_columnar.sql b/src/test/regress/sql/citus_non_blocking_split_columnar.sql index 33a622968..21639f19a 100644 --- a/src/test/regress/sql/citus_non_blocking_split_columnar.sql +++ b/src/test/regress/sql/citus_non_blocking_split_columnar.sql @@ -244,7 +244,9 @@ CALL pg_catalog.citus_cleanup_orphaned_resources(); -- END: Split a partition table directly -- BEGIN: Perform deferred cleanup. +SET client_min_messages TO WARNING; CALL pg_catalog.citus_cleanup_orphaned_resources(); +RESET client_min_messages; -- END: Perform deferred cleanup. -- BEGIN: Validate Shard Info and Data diff --git a/src/test/regress/sql/citus_non_blocking_split_shard_cleanup.sql b/src/test/regress/sql/citus_non_blocking_split_shard_cleanup.sql index 5fda60d62..dab69dfbc 100644 --- a/src/test/regress/sql/citus_non_blocking_split_shard_cleanup.sql +++ b/src/test/regress/sql/citus_non_blocking_split_shard_cleanup.sql @@ -46,6 +46,11 @@ SELECT pg_catalog.citus_split_shard_by_split_points( ARRAY[:worker_2_node, :worker_2_node], 'force_logical'); +-- BEGIN: Perform deferred cleanup. +SET client_min_messages TO WARNING; +CALL pg_catalog.citus_cleanup_orphaned_resources(); +RESET client_min_messages; +-- END: Perform deferred cleanup. \c - - - :worker_1_port SET search_path TO "citus_split_test_schema"; diff --git a/src/test/regress/sql/citus_non_blocking_split_shards.sql b/src/test/regress/sql/citus_non_blocking_split_shards.sql index a387b07c5..2109a6902 100644 --- a/src/test/regress/sql/citus_non_blocking_split_shards.sql +++ b/src/test/regress/sql/citus_non_blocking_split_shards.sql @@ -149,7 +149,9 @@ SELECT pg_catalog.citus_split_shard_by_split_points( 'force_logical'); -- BEGIN: Perform deferred cleanup. +SET client_min_messages TO WARNING; CALL pg_catalog.citus_cleanup_orphaned_resources(); +RESET client_min_messages; -- END: Perform deferred cleanup. -- Perform 3 way split @@ -161,7 +163,9 @@ SELECT pg_catalog.citus_split_shard_by_split_points( -- END : Split two shards : One with move and One without move. -- BEGIN: Perform deferred cleanup. +SET client_min_messages TO WARNING; CALL pg_catalog.citus_cleanup_orphaned_resources(); +RESET client_min_messages; -- END: Perform deferred cleanup. -- BEGIN : Move a shard post split. @@ -263,7 +267,9 @@ SELECT pg_catalog.citus_split_shard_by_split_points( ARRAY[:worker_1_node, :worker_2_node]); -- BEGIN: Perform deferred cleanup. +SET client_min_messages TO WARNING; CALL pg_catalog.citus_cleanup_orphaned_resources(); +RESET client_min_messages; -- END: Perform deferred cleanup. SELECT shard.shardid, logicalrelid, shardminvalue, shardmaxvalue, nodename, nodeport @@ -288,7 +294,9 @@ SELECT pg_catalog.citus_split_shard_by_split_points( 'auto'); -- BEGIN: Perform deferred cleanup. +SET client_min_messages TO WARNING; CALL pg_catalog.citus_cleanup_orphaned_resources(); +RESET client_min_messages; -- END: Perform deferred cleanup. SELECT shard.shardid, logicalrelid, shardminvalue, shardmaxvalue, nodename, nodeport @@ -308,6 +316,11 @@ SELECT COUNT(*) FROM colocated_dist_table; --BEGIN : Cleanup \c - postgres - :master_port +-- make sure we don't have any replication objects leftover on the workers +SELECT run_command_on_workers($$SELECT count(*) FROM pg_replication_slots$$); +SELECT run_command_on_workers($$SELECT count(*) FROM pg_publication$$); +SELECT run_command_on_workers($$SELECT count(*) FROM pg_subscription$$); + ALTER SYSTEM RESET citus.defer_shard_delete_interval; SELECT pg_reload_conf(); DROP SCHEMA "citus_split_test_schema" CASCADE; diff --git a/src/test/regress/sql/failure_online_move_shard_placement.sql b/src/test/regress/sql/failure_online_move_shard_placement.sql index ce33ce6e0..31b51f1e7 100644 --- a/src/test/regress/sql/failure_online_move_shard_placement.sql +++ b/src/test/regress/sql/failure_online_move_shard_placement.sql @@ -134,7 +134,9 @@ SELECT citus.mitmproxy('conn.allow()'); SELECT run_command_on_workers($$DROP SUBSCRIPTION IF EXISTS citus_shard_move_subscription_10$$); -- cleanup leftovers -- verify we don't see any error for already dropped subscription +SET client_min_messages TO WARNING; CALL citus_cleanup_orphaned_resources(); +RESET client_min_messages; -- cancellation on dropping subscription SELECT citus.mitmproxy('conn.onQuery(query="^DROP SUBSCRIPTION").cancel(' || :pid || ')'); diff --git a/src/test/regress/sql/failure_split_cleanup.sql b/src/test/regress/sql/failure_split_cleanup.sql index 03b6cb45c..597a5f6bd 100644 --- a/src/test/regress/sql/failure_split_cleanup.sql +++ b/src/test/regress/sql/failure_split_cleanup.sql @@ -40,7 +40,7 @@ SELECT create_distributed_table('table_to_split', 'id'); ARRAY[:worker_1_node, :worker_2_node], 'force_logical'); SELECT operation_id, object_type, object_name, node_group_id, policy_type - FROM pg_dist_cleanup where operation_id = 777; + FROM pg_dist_cleanup where operation_id = 777 ORDER BY object_name; -- we need to allow connection so that we can connect to proxy SELECT citus.mitmproxy('conn.allow()'); @@ -60,7 +60,7 @@ SELECT create_distributed_table('table_to_split', 'id'); \c - postgres - :master_port CALL pg_catalog.citus_cleanup_orphaned_resources(); SELECT operation_id, object_type, object_name, node_group_id, policy_type - FROM pg_dist_cleanup where operation_id = 777; + FROM pg_dist_cleanup where operation_id = 777 ORDER BY object_name; \c - - - :worker_2_proxy_port SET search_path TO "citus_failure_split_cleanup_schema", public, pg_catalog; @@ -92,7 +92,7 @@ SELECT create_distributed_table('table_to_split', 'id'); RESET client_min_messages; SELECT operation_id, object_type, object_name, node_group_id, policy_type - FROM pg_dist_cleanup where operation_id = 777; + FROM pg_dist_cleanup where operation_id = 777 ORDER BY object_name; -- we need to allow connection so that we can connect to proxy SELECT citus.mitmproxy('conn.allow()'); @@ -111,7 +111,7 @@ SELECT create_distributed_table('table_to_split', 'id'); \c - postgres - :master_port CALL pg_catalog.citus_cleanup_orphaned_resources(); SELECT operation_id, object_type, object_name, node_group_id, policy_type - FROM pg_dist_cleanup where operation_id = 777; + FROM pg_dist_cleanup where operation_id = 777 ORDER BY object_name; \c - - - :worker_2_proxy_port SET search_path TO "citus_failure_split_cleanup_schema", public, pg_catalog; @@ -138,7 +138,7 @@ SELECT create_distributed_table('table_to_split', 'id'); ARRAY[:worker_1_node, :worker_2_node], 'force_logical'); SELECT operation_id, object_type, object_name, node_group_id, policy_type - FROM pg_dist_cleanup where operation_id = 777; + FROM pg_dist_cleanup where operation_id = 777 ORDER BY object_name; -- we need to allow connection so that we can connect to proxy SELECT citus.mitmproxy('conn.allow()'); @@ -157,7 +157,7 @@ SELECT create_distributed_table('table_to_split', 'id'); \c - postgres - :master_port CALL pg_catalog.citus_cleanup_orphaned_resources(); SELECT operation_id, object_type, object_name, node_group_id, policy_type - FROM pg_dist_cleanup where operation_id = 777; + FROM pg_dist_cleanup where operation_id = 777 ORDER BY object_name; \c - - - :worker_2_proxy_port SET search_path TO "citus_failure_split_cleanup_schema", public, pg_catalog; @@ -184,7 +184,7 @@ SELECT create_distributed_table('table_to_split', 'id'); ARRAY[:worker_1_node, :worker_2_node], 'force_logical'); SELECT operation_id, object_type, object_name, node_group_id, policy_type - FROM pg_dist_cleanup where operation_id = 777; + FROM pg_dist_cleanup where operation_id = 777 ORDER BY object_name; -- we need to allow connection so that we can connect to proxy SELECT citus.mitmproxy('conn.allow()'); @@ -203,7 +203,7 @@ SELECT create_distributed_table('table_to_split', 'id'); \c - postgres - :master_port CALL pg_catalog.citus_cleanup_orphaned_resources(); SELECT operation_id, object_type, object_name, node_group_id, policy_type - FROM pg_dist_cleanup where operation_id = 777; + FROM pg_dist_cleanup where operation_id = 777 ORDER BY object_name; \c - - - :worker_2_proxy_port SET search_path TO "citus_failure_split_cleanup_schema", public, pg_catalog; @@ -230,7 +230,7 @@ SELECT create_distributed_table('table_to_split', 'id'); ARRAY[:worker_1_node, :worker_2_node], 'force_logical'); SELECT operation_id, object_type, object_name, node_group_id, policy_type - FROM pg_dist_cleanup where operation_id = 777; + FROM pg_dist_cleanup where operation_id = 777 ORDER BY object_name; -- we need to allow connection so that we can connect to proxy SELECT citus.mitmproxy('conn.allow()'); @@ -249,7 +249,7 @@ SELECT create_distributed_table('table_to_split', 'id'); \c - postgres - :master_port CALL pg_catalog.citus_cleanup_orphaned_resources(); SELECT operation_id, object_type, object_name, node_group_id, policy_type - FROM pg_dist_cleanup where operation_id = 777; + FROM pg_dist_cleanup where operation_id = 777 ORDER BY object_name; \c - - - :worker_2_proxy_port SET search_path TO "citus_failure_split_cleanup_schema", public, pg_catalog; @@ -277,7 +277,7 @@ SELECT create_distributed_table('table_to_split', 'id'); 'force_logical'); SELECT operation_id, object_type, object_name, node_group_id, policy_type - FROM pg_dist_cleanup where operation_id = 777; + FROM pg_dist_cleanup where operation_id = 777 ORDER BY object_name; SELECT relname FROM pg_class where relname LIKE '%table_to_split_%' AND relkind = 'r' order by relname; -- we need to allow connection so that we can connect to proxy SELECT citus.mitmproxy('conn.allow()'); @@ -297,7 +297,7 @@ SELECT create_distributed_table('table_to_split', 'id'); \c - postgres - :master_port CALL pg_catalog.citus_cleanup_orphaned_resources(); SELECT operation_id, object_type, object_name, node_group_id, policy_type - FROM pg_dist_cleanup where operation_id = 777; + FROM pg_dist_cleanup where operation_id = 777 ORDER BY object_name; \c - - - :worker_2_proxy_port SET search_path TO "citus_failure_split_cleanup_schema", public, pg_catalog; From ad6450b793f3df053e04a52a9fc59939884c383e Mon Sep 17 00:00:00 2001 From: songjinzhou <49380232+TsinghuaLucky912@users.noreply.github.com> Date: Fri, 2 Dec 2022 20:49:32 +0800 Subject: [PATCH 7/9] fix the problem #5763 (#6519) Co-authored-by: TsinghuaLucky912 Fixes https://github.com/citusdata/citus/issues/5763 --- .../commands/distribute_object_ops.c | 14 +++ src/backend/distributed/commands/owned.c | 90 +++++++++++++++++++ .../deparser/deparse_owned_stmts.c | 84 +++++++++++++++++ src/include/distributed/commands.h | 3 + src/include/distributed/deparser.h | 3 + src/test/regress/expected/issue_5763.out | 38 ++++++++ src/test/regress/expected/multi_multiuser.out | 21 ----- .../regress/expected/multi_schema_support.out | 7 -- .../expected/single_node_enterprise.out | 12 --- src/test/regress/multi_schedule | 3 +- src/test/regress/sql/issue_5763.sql | 50 +++++++++++ src/test/regress/sql/multi_multiuser.sql | 3 - src/test/regress/sql/multi_schema_support.sql | 1 - .../regress/sql/single_node_enterprise.sql | 2 - 14 files changed, 284 insertions(+), 47 deletions(-) create mode 100644 src/backend/distributed/commands/owned.c create mode 100644 src/backend/distributed/deparser/deparse_owned_stmts.c create mode 100644 src/test/regress/expected/issue_5763.out create mode 100644 src/test/regress/sql/issue_5763.sql diff --git a/src/backend/distributed/commands/distribute_object_ops.c b/src/backend/distributed/commands/distribute_object_ops.c index 056ba20e2..3f30eaaa2 100644 --- a/src/backend/distributed/commands/distribute_object_ops.c +++ b/src/backend/distributed/commands/distribute_object_ops.c @@ -254,6 +254,15 @@ static DistributeObjectOps Any_CreateRole = { .address = CreateRoleStmtObjectAddress, .markDistributed = true, }; +static DistributeObjectOps Any_DropOwned = { + .deparse = DeparseDropOwnedStmt, + .qualify = NULL, + .preprocess = PreprocessDropOwnedStmt, + .postprocess = NULL, + .operationType = DIST_OPS_DROP, + .address = NULL, + .markDistributed = false, +}; static DistributeObjectOps Any_DropRole = { .deparse = DeparseDropRoleStmt, .qualify = NULL, @@ -1658,6 +1667,11 @@ GetDistributeObjectOps(Node *node) return &Any_DropRole; } + case T_DropOwnedStmt: + { + return &Any_DropOwned; + } + case T_DropStmt: { DropStmt *stmt = castNode(DropStmt, node); diff --git a/src/backend/distributed/commands/owned.c b/src/backend/distributed/commands/owned.c new file mode 100644 index 000000000..c8f6a4bbe --- /dev/null +++ b/src/backend/distributed/commands/owned.c @@ -0,0 +1,90 @@ +/*------------------------------------------------------------------------- + * + * owned.c + * Commands for DROP OWNED statements. + * + * Copyright (c) Citus Data, Inc. + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "access/heapam.h" +#include "access/htup_details.h" +#include "access/genam.h" +#include "access/table.h" +#include "access/xact.h" +#include "catalog/catalog.h" +#include "catalog/pg_auth_members.h" +#include "catalog/pg_authid.h" +#include "catalog/pg_db_role_setting.h" +#include "catalog/pg_type.h" +#include "catalog/objectaddress.h" +#include "commands/dbcommands.h" +#include "distributed/citus_ruleutils.h" +#include "distributed/citus_safe_lib.h" +#include "distributed/commands.h" +#include "distributed/commands/utility_hook.h" +#include "distributed/deparser.h" +#include "distributed/listutils.h" +#include "distributed/coordinator_protocol.h" +#include "distributed/metadata/distobject.h" +#include "distributed/metadata_sync.h" +#include "distributed/metadata/distobject.h" +#include "distributed/multi_executor.h" +#include "distributed/relation_access_tracking.h" +#include "distributed/version_compat.h" +#include "distributed/worker_transaction.h" +#include "miscadmin.h" +#include "nodes/makefuncs.h" +#include "nodes/parsenodes.h" +#include "nodes/pg_list.h" +#include "parser/scansup.h" +#include "utils/builtins.h" +#include "utils/fmgroids.h" +#include "utils/rel.h" +#include "utils/varlena.h" +#include "utils/syscache.h" + +/* + * PreprocessDropOwnedStmt finds the distributed role out of the ones + * being dropped and unmarks them distributed and creates the drop statements + * for the workers. + */ +List * +PreprocessDropOwnedStmt(Node *node, const char *queryString, + ProcessUtilityContext processUtilityContext) +{ + DropOwnedStmt *stmt = castNode(DropOwnedStmt, node); + List *allDropRoles = stmt->roles; + + List *distributedDropRoles = FilterDistributedRoles(allDropRoles); + if (list_length(distributedDropRoles) <= 0) + { + return NIL; + } + + if (!ShouldPropagate()) + { + return NIL; + } + + /* check creation against multi-statement transaction policy */ + if (!ShouldPropagateCreateInCoordinatedTransction()) + { + return NIL; + } + + EnsureCoordinator(); + + stmt->roles = distributedDropRoles; + char *sql = DeparseTreeNode((Node *) stmt); + stmt->roles = allDropRoles; + + List *commands = list_make3(DISABLE_DDL_PROPAGATION, + sql, + ENABLE_DDL_PROPAGATION); + + return NodeDDLTaskList(NON_COORDINATOR_NODES, commands); +} diff --git a/src/backend/distributed/deparser/deparse_owned_stmts.c b/src/backend/distributed/deparser/deparse_owned_stmts.c new file mode 100644 index 000000000..888071165 --- /dev/null +++ b/src/backend/distributed/deparser/deparse_owned_stmts.c @@ -0,0 +1,84 @@ +/*------------------------------------------------------------------------- + * + * deparse_owned_stmts.c + * Functions to turn all Statement structures related to owned back + * into sql. + * + * Copyright (c), Citus Data, Inc. + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "pg_version_compat.h" + +#include "distributed/citus_ruleutils.h" +#include "distributed/deparser.h" +#include "lib/stringinfo.h" +#include "nodes/parsenodes.h" +#include "utils/builtins.h" + +static void AppendDropOwnedStmt(StringInfo buf, DropOwnedStmt *stmt); +static void AppendRoleList(StringInfo buf, List *roleList); + +/* + * DeparseDropOwnedStmt builds and returns a string representing of the + * DropOwnedStmt for application on a remote server. + */ +char * +DeparseDropOwnedStmt(Node *node) +{ + DropOwnedStmt *stmt = castNode(DropOwnedStmt, node); + + StringInfoData buf = { 0 }; + initStringInfo(&buf); + + AppendDropOwnedStmt(&buf, stmt); + + return buf.data; +} + + +/* + * AppendDropOwnedStmt generates the string representation of the + * DropOwnedStmt and appends it to the buffer. + */ +static void +AppendDropOwnedStmt(StringInfo buf, DropOwnedStmt *stmt) +{ + appendStringInfo(buf, "DROP OWNED BY "); + + AppendRoleList(buf, stmt->roles); + + if (stmt->behavior == DROP_RESTRICT) + { + appendStringInfo(buf, " RESTRICT"); + } + else if (stmt->behavior == DROP_CASCADE) + { + appendStringInfo(buf, " CASCADE"); + } +} + + +static void +AppendRoleList(StringInfo buf, List *roleList) +{ + ListCell *cell = NULL; + foreach(cell, roleList) + { + Node *roleNode = (Node *) lfirst(cell); + Assert(IsA(roleNode, RoleSpec) || IsA(roleNode, AccessPriv)); + char const *rolename = NULL; + if (IsA(roleNode, RoleSpec)) + { + rolename = RoleSpecString((RoleSpec *) roleNode, true); + } + appendStringInfoString(buf, rolename); + if (cell != list_tail(roleList)) + { + appendStringInfo(buf, ", "); + } + } +} diff --git a/src/include/distributed/commands.h b/src/include/distributed/commands.h index c09f077dd..fa2691fee 100644 --- a/src/include/distributed/commands.h +++ b/src/include/distributed/commands.h @@ -385,6 +385,9 @@ extern bool IsReindexWithParam_compat(ReindexStmt *stmt, char *paramName); extern List * CreateExtensionStmtObjectAddress(Node *stmt, bool missing_ok, bool isPostprocess); +/* owned.c - forward declarations */ +extern List * PreprocessDropOwnedStmt(Node *node, const char *queryString, + ProcessUtilityContext processUtilityContext); /* policy.c - forward declarations */ extern List * CreatePolicyCommands(Oid relationId); diff --git a/src/include/distributed/deparser.h b/src/include/distributed/deparser.h index 0d0a99e22..87704b628 100644 --- a/src/include/distributed/deparser.h +++ b/src/include/distributed/deparser.h @@ -196,6 +196,9 @@ extern char * DeparseCreateRoleStmt(Node *stmt); extern char * DeparseDropRoleStmt(Node *stmt); extern char * DeparseGrantRoleStmt(Node *stmt); +/* forward declarations for deparse_owned_stmts.c */ +extern char * DeparseDropOwnedStmt(Node *node); + /* forward declarations for deparse_extension_stmts.c */ extern DefElem * GetExtensionOption(List *extensionOptions, const char *defname); diff --git a/src/test/regress/expected/issue_5763.out b/src/test/regress/expected/issue_5763.out new file mode 100644 index 000000000..aa6c4f35b --- /dev/null +++ b/src/test/regress/expected/issue_5763.out @@ -0,0 +1,38 @@ +-- +-- ISSUE_5763 +-- +-- Issue: DROP OWNED BY fails to drop the schemas on the workers +-- Link: https://github.com/citusdata/citus/issues/5763 +-- +CREATE USER issue_5763_1 WITH SUPERUSER; +CREATE USER issue_5763_2 WITH SUPERUSER; +\c - issue_5763_1 - :master_port +CREATE SCHEMA issue_5763_sc_1; +\c - issue_5763_2 - :master_port +CREATE SCHEMA issue_5763_sc_2; +\c - postgres - :master_port +DROP OWNED BY issue_5763_1, issue_5763_2; +\c - issue_5763_1 - :master_port +CREATE SCHEMA issue_5763_sc_1; +\c - postgres - :master_port +DROP SCHEMA issue_5763_sc_1; +DROP USER issue_5763_1, issue_5763_2; +-- test CASCADE options +CREATE USER issue_5763_3 WITH SUPERUSER; +\c - issue_5763_3 - :master_port +CREATE SCHEMA issue_5763_sc_3; +CREATE TABLE issue_5763_sc_3.tb1(id int); +\c - postgres - :master_port +DROP OWNED BY issue_5763_3 CASCADE; +DROP USER issue_5763_3; +-- test non-distributed role +SET citus.enable_create_role_propagation TO off; +CREATE USER issue_5763_4 WITH SUPERUSER; +NOTICE: not propagating CREATE ROLE/USER commands to worker nodes +HINT: Connect to worker nodes directly to manually create all necessary users and roles. +\c - issue_5763_4 - :master_port +set citus.enable_ddl_propagation = off; +CREATE SCHEMA issue_5763_sc_4; +\c - postgres - :master_port +DROP OWNED BY issue_5763_4 RESTRICT; +DROP USER issue_5763_4; diff --git a/src/test/regress/expected/multi_multiuser.out b/src/test/regress/expected/multi_multiuser.out index b8146bf5a..d6216a961 100644 --- a/src/test/regress/expected/multi_multiuser.out +++ b/src/test/regress/expected/multi_multiuser.out @@ -545,27 +545,6 @@ DROP TABLE test, test_coloc, colocation_table; -SELECT run_command_on_workers($$DROP OWNED BY full_access$$); - run_command_on_workers ---------------------------------------------------------------------- - (localhost,57637,t,"DROP OWNED") - (localhost,57638,t,"DROP OWNED") -(2 rows) - -SELECT run_command_on_workers($$DROP OWNED BY some_role$$); - run_command_on_workers ---------------------------------------------------------------------- - (localhost,57637,t,"DROP OWNED") - (localhost,57638,t,"DROP OWNED") -(2 rows) - -SELECT run_command_on_workers($$DROP OWNED BY read_access$$); - run_command_on_workers ---------------------------------------------------------------------- - (localhost,57637,t,"DROP OWNED") - (localhost,57638,t,"DROP OWNED") -(2 rows) - DROP USER full_access; DROP USER read_access; DROP USER no_access; diff --git a/src/test/regress/expected/multi_schema_support.out b/src/test/regress/expected/multi_schema_support.out index 890663db4..31f0c3e28 100644 --- a/src/test/regress/expected/multi_schema_support.out +++ b/src/test/regress/expected/multi_schema_support.out @@ -1154,13 +1154,6 @@ SET citus.next_shard_id TO 1197000; -- we do not use run_command_on_coordinator_and_workers here because when there is CASCADE, it causes deadlock DROP OWNED BY "test-user" CASCADE; NOTICE: drop cascades to table schema_with_user.test_table -SELECT run_command_on_workers('DROP OWNED BY "test-user" CASCADE'); - run_command_on_workers ---------------------------------------------------------------------- - (localhost,57637,t,"DROP OWNED") - (localhost,57638,t,"DROP OWNED") -(2 rows) - DROP USER "test-user"; DROP FUNCTION run_command_on_coordinator_and_workers(p_sql text); -- test run_command_on_* UDFs with schema diff --git a/src/test/regress/expected/single_node_enterprise.out b/src/test/regress/expected/single_node_enterprise.out index 55eee2124..e6155ba4e 100644 --- a/src/test/regress/expected/single_node_enterprise.out +++ b/src/test/regress/expected/single_node_enterprise.out @@ -489,18 +489,6 @@ SET client_min_messages TO WARNING; DROP SCHEMA single_node_ent CASCADE; DROP OWNED BY full_access_single_node; DROP OWNED BY read_access_single_node; -SELECT run_command_on_workers($$DROP OWNED BY full_access_single_node$$); - run_command_on_workers ---------------------------------------------------------------------- - (localhost,57637,t,"DROP OWNED") -(1 row) - -SELECT run_command_on_workers($$DROP OWNED BY read_access_single_node$$); - run_command_on_workers ---------------------------------------------------------------------- - (localhost,57637,t,"DROP OWNED") -(1 row) - DROP ROLE full_access_single_node; DROP ROLE read_access_single_node; -- remove the nodes for next tests diff --git a/src/test/regress/multi_schedule b/src/test/regress/multi_schedule index 1de193e1f..68b3e9cf7 100644 --- a/src/test/regress/multi_schedule +++ b/src/test/regress/multi_schedule @@ -95,7 +95,7 @@ test: multi_dropped_column_aliases foreign_key_restriction_enforcement test: binary_protocol test: alter_table_set_access_method test: alter_distributed_table -test: issue_5248 issue_5099 +test: issue_5248 issue_5099 issue_5763 test: object_propagation_debug test: undistribute_table test: run_command_on_all_nodes @@ -113,3 +113,4 @@ test: ensure_no_intermediate_data_leak # -------- test: ensure_no_shared_connection_leak test: check_mx + diff --git a/src/test/regress/sql/issue_5763.sql b/src/test/regress/sql/issue_5763.sql new file mode 100644 index 000000000..c8c146ec2 --- /dev/null +++ b/src/test/regress/sql/issue_5763.sql @@ -0,0 +1,50 @@ +-- +-- ISSUE_5763 +-- +-- Issue: DROP OWNED BY fails to drop the schemas on the workers +-- Link: https://github.com/citusdata/citus/issues/5763 +-- + +CREATE USER issue_5763_1 WITH SUPERUSER; +CREATE USER issue_5763_2 WITH SUPERUSER; + +\c - issue_5763_1 - :master_port +CREATE SCHEMA issue_5763_sc_1; + +\c - issue_5763_2 - :master_port +CREATE SCHEMA issue_5763_sc_2; + +\c - postgres - :master_port +DROP OWNED BY issue_5763_1, issue_5763_2; + +\c - issue_5763_1 - :master_port +CREATE SCHEMA issue_5763_sc_1; + +\c - postgres - :master_port +DROP SCHEMA issue_5763_sc_1; +DROP USER issue_5763_1, issue_5763_2; + +-- test CASCADE options +CREATE USER issue_5763_3 WITH SUPERUSER; + +\c - issue_5763_3 - :master_port +CREATE SCHEMA issue_5763_sc_3; +CREATE TABLE issue_5763_sc_3.tb1(id int); + +\c - postgres - :master_port +DROP OWNED BY issue_5763_3 CASCADE; + +DROP USER issue_5763_3; + +-- test non-distributed role +SET citus.enable_create_role_propagation TO off; +CREATE USER issue_5763_4 WITH SUPERUSER; + +\c - issue_5763_4 - :master_port +set citus.enable_ddl_propagation = off; +CREATE SCHEMA issue_5763_sc_4; + +\c - postgres - :master_port +DROP OWNED BY issue_5763_4 RESTRICT; + +DROP USER issue_5763_4; diff --git a/src/test/regress/sql/multi_multiuser.sql b/src/test/regress/sql/multi_multiuser.sql index ef9700413..150abe307 100644 --- a/src/test/regress/sql/multi_multiuser.sql +++ b/src/test/regress/sql/multi_multiuser.sql @@ -328,9 +328,6 @@ DROP TABLE test, test_coloc, colocation_table; -SELECT run_command_on_workers($$DROP OWNED BY full_access$$); -SELECT run_command_on_workers($$DROP OWNED BY some_role$$); -SELECT run_command_on_workers($$DROP OWNED BY read_access$$); DROP USER full_access; DROP USER read_access; DROP USER no_access; diff --git a/src/test/regress/sql/multi_schema_support.sql b/src/test/regress/sql/multi_schema_support.sql index 01873659f..944667c2a 100644 --- a/src/test/regress/sql/multi_schema_support.sql +++ b/src/test/regress/sql/multi_schema_support.sql @@ -842,7 +842,6 @@ SET citus.next_shard_id TO 1197000; -- we do not use run_command_on_coordinator_and_workers here because when there is CASCADE, it causes deadlock DROP OWNED BY "test-user" CASCADE; -SELECT run_command_on_workers('DROP OWNED BY "test-user" CASCADE'); DROP USER "test-user"; DROP FUNCTION run_command_on_coordinator_and_workers(p_sql text); diff --git a/src/test/regress/sql/single_node_enterprise.sql b/src/test/regress/sql/single_node_enterprise.sql index 9ad590bb8..76615b852 100644 --- a/src/test/regress/sql/single_node_enterprise.sql +++ b/src/test/regress/sql/single_node_enterprise.sql @@ -313,8 +313,6 @@ DROP SCHEMA single_node_ent CASCADE; DROP OWNED BY full_access_single_node; DROP OWNED BY read_access_single_node; -SELECT run_command_on_workers($$DROP OWNED BY full_access_single_node$$); -SELECT run_command_on_workers($$DROP OWNED BY read_access_single_node$$); DROP ROLE full_access_single_node; DROP ROLE read_access_single_node; From 6781ace3a107414da66e89108a4049df72e0d7bf Mon Sep 17 00:00:00 2001 From: aykut-bozkurt <51649454+aykut-bozkurt@users.noreply.github.com> Date: Fri, 2 Dec 2022 18:04:29 +0300 Subject: [PATCH 8/9] find core files from correct path on CI (#6535) Finds core files from correct path on CI. According to default core pattern on CI, core is generated at the location relative to binary is executed. It can be safe to set core pattern before running binary but to change a kernel param(in our case kernel.core_pattern), you need related privilege in docker container. Or you have to change it at image build. But, by default, on CI machines, kernel pattern contains a relative path to binary + pid + process name, so we do not need to set it explicitly for now. (Example core file name on CI machine: `core.2559.!usr!lib!postgresql!14!bin!postgres`) --- .circleci/config.yml | 30 ++++++++++++++++++------------ 1 file changed, 18 insertions(+), 12 deletions(-) diff --git a/.circleci/config.yml b/.circleci/config.yml index 4e952b470..2cebd86ec 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -167,8 +167,9 @@ jobs: name: 'Copy coredumps' command: | mkdir -p /tmp/core_dumps - if ls core.* 1> /dev/null 2>&1; then - cp core.* /tmp/core_dumps + core_files=( $(find . -type f -regex .*core.*\d*.*postgres) ) + if [ ${#core_files[@]} -gt 0 ]; then + cp "${core_files[@]}" /tmp/core_dumps fi when: on_fail - run: @@ -258,8 +259,9 @@ jobs: name: 'Copy coredumps' command: | mkdir -p /tmp/core_dumps - if ls core.* 1> /dev/null 2>&1; then - cp core.* /tmp/core_dumps + core_files=( $(find . -type f -regex .*core.*\d*.*postgres) ) + if [ ${#core_files[@]} -gt 0 ]; then + cp "${core_files[@]}" /tmp/core_dumps fi when: on_fail - store_artifacts: @@ -339,8 +341,9 @@ jobs: name: 'Copy coredumps' command: | mkdir -p /tmp/core_dumps - if ls core.* 1> /dev/null 2>&1; then - cp core.* /tmp/core_dumps + core_files=( $(find . -type f -regex .*core.*\d*.*postgres) ) + if [ ${#core_files[@]} -gt 0 ]; then + cp "${core_files[@]}" /tmp/core_dumps fi when: on_fail - store_artifacts: @@ -405,8 +408,9 @@ jobs: name: 'Copy coredumps' command: | mkdir -p /tmp/core_dumps - if ls core.* 1> /dev/null 2>&1; then - cp core.* /tmp/core_dumps + core_files=( $(find . -type f -regex .*core.*\d*.*postgres) ) + if [ ${#core_files[@]} -gt 0 ]; then + cp "${core_files[@]}" /tmp/core_dumps fi when: on_fail - store_artifacts: @@ -483,8 +487,9 @@ jobs: name: 'Copy coredumps' command: | mkdir -p /tmp/core_dumps - if ls core.* 1> /dev/null 2>&1; then - cp core.* /tmp/core_dumps + core_files=( $(find . -type f -regex .*core.*\d*.*postgres) ) + if [ ${#core_files[@]} -gt 0 ]; then + cp "${core_files[@]}" /tmp/core_dumps fi when: on_fail - store_artifacts: @@ -600,8 +605,9 @@ jobs: name: 'Copy coredumps' command: | mkdir -p /tmp/core_dumps - if ls core.* 1> /dev/null 2>&1; then - cp core.* /tmp/core_dumps + core_files=( $(find . -type f -regex .*core.*\d*.*postgres) ) + if [ ${#core_files[@]} -gt 0 ]; then + cp "${core_files[@]}" /tmp/core_dumps fi when: on_fail - store_artifacts: From 65f256eec41b127b0ebdf84e4181652401bba7fa Mon Sep 17 00:00:00 2001 From: aykut-bozkurt <51649454+aykut-bozkurt@users.noreply.github.com> Date: Fri, 2 Dec 2022 18:15:31 +0300 Subject: [PATCH 9/9] * add SIGTERM handler to gracefully terminate task executors, \ (#6473) Adds signal handlers for graceful termination, cancellation of task executors and detecting config updates. Related to PR #6459. #### How to handle termination signal? Monitor need to gracefully terminate all running task executors before terminating. Hence, we have sigterm handler for the monitor. #### How to handle cancellation signal? Monitor need to gracefully cancel all running task executors before terminating. Hence, we have sigint handler for the monitor. #### How to detect configuration changes? Monitor has SIGHUP handler to reflect configuration changes while executing tasks. --- .../distributed/utils/background_jobs.c | 266 +++++++++++++--- src/include/distributed/background_jobs.h | 1 + .../background_task_queue_monitor.out | 292 +++++++++++++++++- .../sql/background_task_queue_monitor.sql | 110 ++++++- 4 files changed, 620 insertions(+), 49 deletions(-) diff --git a/src/backend/distributed/utils/background_jobs.c b/src/backend/distributed/utils/background_jobs.c index 2032b7e65..a502b9219 100644 --- a/src/backend/distributed/utils/background_jobs.c +++ b/src/backend/distributed/utils/background_jobs.c @@ -72,12 +72,14 @@ #define CITUS_BACKGROUND_TASK_KEY_COMMAND 2 #define CITUS_BACKGROUND_TASK_KEY_QUEUE 3 #define CITUS_BACKGROUND_TASK_KEY_TASK_ID 4 -#define CITUS_BACKGROUND_TASK_NKEYS 5 +#define CITUS_BACKGROUND_TASK_KEY_JOB_ID 5 +#define CITUS_BACKGROUND_TASK_NKEYS 6 static BackgroundWorkerHandle * StartCitusBackgroundTaskExecutor(char *database, char *user, char *command, int64 taskId, + int64 jobId, dsm_segment **pSegment); static void ExecuteSqlString(const char *sql); static shm_mq_result ConsumeTaskWorkerOutput(shm_mq_handle *responseq, StringInfo message, @@ -106,6 +108,18 @@ static TaskExecutionStatus ConsumeExecutorQueue( TaskExecutionContext *taskExecutionContext); static void TaskHadError(TaskExecutionContext *taskExecutionContext); static void TaskEnded(TaskExecutionContext *taskExecutionContext); +static void TerminateAllTaskExecutors(HTAB *currentExecutors); +static HTAB * GetRunningUniqueJobIds(HTAB *currentExecutors); +static void CancelAllTaskExecutors(HTAB *currentExecutors); +static bool MonitorGotTerminationOrCancellationRequest(); +static void QueueMonitorSigTermHandler(SIGNAL_ARGS); +static void QueueMonitorSigIntHandler(SIGNAL_ARGS); +static void QueueMonitorSigHupHandler(SIGNAL_ARGS); + +/* flags set by signal handlers */ +static volatile sig_atomic_t GotSigterm = false; +static volatile sig_atomic_t GotSigint = false; +static volatile sig_atomic_t GotSighup = false; PG_FUNCTION_INFO_V1(citus_job_cancel); PG_FUNCTION_INFO_V1(citus_job_wait); @@ -337,8 +351,6 @@ CitusBackgroundTaskQueueMonitorErrorCallback(void *arg) /* * NewExecutorExceedsCitusLimit returns true if currently we reached Citus' max worker count. - * It also sleeps 1 sec to let running tasks progress so that we have better chance to not hit - * that limit again. */ static bool NewExecutorExceedsCitusLimit(QueueMonitorExecutionContext *queueMonitorExecutionContext) @@ -372,8 +384,6 @@ NewExecutorExceedsCitusLimit(QueueMonitorExecutionContext *queueMonitorExecution /* * NewExecutorExceedsPgMaxWorkers returns true if currently we reached Postgres' max worker count. - * It also sleeps 1 sec to let running tasks progress so that we have better chance to not hit - * that limit again. */ static bool NewExecutorExceedsPgMaxWorkers(BackgroundWorkerHandle *handle, @@ -389,9 +399,8 @@ NewExecutorExceedsPgMaxWorkers(BackgroundWorkerHandle *handle, */ if (queueMonitorExecutionContext->backgroundWorkerFailedStartTime == 0) { - ereport(WARNING, (errmsg( - "unable to start background worker for " - "background task execution"), + ereport(WARNING, (errmsg("unable to start background worker for " + "background task execution"), errdetail( "Current number of task " "executors: %ld/%d", @@ -432,7 +441,7 @@ AssignRunnableTaskToNewExecutor(BackgroundTask *runnableTask, dsm_segment *seg = NULL; BackgroundWorkerHandle *handle = StartCitusBackgroundTaskExecutor(databaseName, userName, runnableTask->command, - runnableTask->taskid, &seg); + runnableTask->taskid, runnableTask->jobid, &seg); MemoryContextSwitchTo(oldContext); if (NewExecutorExceedsPgMaxWorkers(handle, queueMonitorExecutionContext)) @@ -450,6 +459,7 @@ AssignRunnableTaskToNewExecutor(BackgroundTask *runnableTask, Assert(!handleEntryFound); handleEntry->handle = handle; handleEntry->seg = seg; + handleEntry->jobid = runnableTask->jobid; /* reset worker allocation timestamp and log time elapsed since the last failure */ CheckAndResetLastWorkerAllocationFailure(queueMonitorExecutionContext); @@ -741,6 +751,138 @@ TaskEnded(TaskExecutionContext *taskExecutionContext) } +/* + * QueueMonitorSigHupHandler handles SIGHUP to update monitor related config params. + */ +static void +QueueMonitorSigHupHandler(SIGNAL_ARGS) +{ + int saved_errno = errno; + + GotSighup = true; + + if (MyProc) + { + SetLatch(&MyProc->procLatch); + } + + errno = saved_errno; +} + + +/* + * MonitorGotTerminationOrCancellationRequest returns true if monitor had SIGTERM or SIGINT signals + */ +static bool +MonitorGotTerminationOrCancellationRequest() +{ + return GotSigterm || GotSigint; +} + + +/* + * QueueMonitorSigTermHandler handles SIGTERM by setting a flag to inform the monitor process + * so that it can terminate active task executors properly. It also sets the latch to awake the + * monitor if it waits on it. + */ +static void +QueueMonitorSigTermHandler(SIGNAL_ARGS) +{ + int saved_errno = errno; + + GotSigterm = true; + + if (MyProc) + { + SetLatch(&MyProc->procLatch); + } + + errno = saved_errno; +} + + +/* + * QueueMonitorSigIntHandler handles SIGINT by setting a flag to inform the monitor process + * so that it can terminate active task executors properly. It also sets the latch to awake the + * monitor if it waits on it. + */ +static void +QueueMonitorSigIntHandler(SIGNAL_ARGS) +{ + int saved_errno = errno; + + GotSigint = true; + + if (MyProc) + { + SetLatch(&MyProc->procLatch); + } + + errno = saved_errno; +} + + +/* + * TerminateAllTaskExecutors terminates task executors given in the hash map. + */ +static void +TerminateAllTaskExecutors(HTAB *currentExecutors) +{ + HASH_SEQ_STATUS status; + BackgroundExecutorHashEntry *backgroundExecutorHashEntry; + foreach_htab(backgroundExecutorHashEntry, &status, currentExecutors) + { + TerminateBackgroundWorker(backgroundExecutorHashEntry->handle); + } +} + + +/* + * GetRunningUniqueJobIds returns unique job ids from currentExecutors + */ +static HTAB * +GetRunningUniqueJobIds(HTAB *currentExecutors) +{ + /* create a set to store unique job ids for currently executing tasks */ + HTAB *uniqueJobIds = CreateSimpleHashSetWithSize(int64, MAX_BG_TASK_EXECUTORS); + + HASH_SEQ_STATUS status; + BackgroundExecutorHashEntry *backgroundExecutorHashEntry; + foreach_htab(backgroundExecutorHashEntry, &status, currentExecutors) + { + hash_search(uniqueJobIds, &backgroundExecutorHashEntry->jobid, HASH_ENTER, NULL); + } + + return uniqueJobIds; +} + + +/* + * CancelAllTaskExecutors cancels task executors given in the hash map. + */ +static void +CancelAllTaskExecutors(HTAB *currentExecutors) +{ + StartTransactionCommand(); + PushActiveSnapshot(GetTransactionSnapshot()); + + /* get unique job id set for running tasks in currentExecutors */ + HTAB *uniqueJobIds = GetRunningUniqueJobIds(currentExecutors); + + HASH_SEQ_STATUS status; + int64 *uniqueJobId; + foreach_htab(uniqueJobId, &status, uniqueJobIds) + { + ereport(DEBUG1, (errmsg("cancelling job: %ld", *uniqueJobId))); + Datum jobidDatum = Int64GetDatum(*uniqueJobId); + DirectFunctionCall1(citus_job_cancel, jobidDatum); + } + + PopActiveSnapshot(); + CommitTransactionCommand(); +} + + /* * CitusBackgroundTaskQueueMonitorMain is the main entry point for the background worker * running the background tasks queue monitor. @@ -758,6 +900,18 @@ TaskEnded(TaskExecutionContext *taskExecutionContext) void CitusBackgroundTaskQueueMonitorMain(Datum arg) { + /* handle SIGTERM to properly terminate active task executors */ + pqsignal(SIGTERM, QueueMonitorSigTermHandler); + + /* handle SIGINT to properly cancel active task executors */ + pqsignal(SIGINT, QueueMonitorSigIntHandler); + + /* handle SIGHUP to update MaxBackgroundTaskExecutors */ + pqsignal(SIGHUP, QueueMonitorSigHupHandler); + + /* ready to handle signals */ + BackgroundWorkerUnblockSignals(); + Oid databaseOid = DatumGetObjectId(arg); /* extension owner is passed via bgw_extra */ @@ -765,8 +919,6 @@ CitusBackgroundTaskQueueMonitorMain(Datum arg) memcpy_s(&extensionOwner, sizeof(extensionOwner), MyBgworkerEntry->bgw_extra, sizeof(Oid)); - BackgroundWorkerUnblockSignals(); - /* connect to database, after that we can actually access catalogs */ BackgroundWorkerInitializeConnectionByOid(databaseOid, extensionOwner, 0); @@ -807,10 +959,6 @@ CitusBackgroundTaskQueueMonitorMain(Datum arg) * cause conflicts on processing the tasks in the catalog table as well as violate * parallelism guarantees. To make sure there is at most, exactly one backend running * we take a session lock on the CITUS_BACKGROUND_TASK_MONITOR operation. - * - * TODO now that we have a lock, we should install a term handler to terminate any - * 'child' backend when we are terminated. Otherwise we will still have a situation - * where the actual task could be running multiple times. */ LOCKTAG tag = { 0 }; SET_LOCKTAG_CITUS_OPERATION(tag, CITUS_BACKGROUND_TASK_MONITOR); @@ -841,11 +989,13 @@ CitusBackgroundTaskQueueMonitorMain(Datum arg) PopActiveSnapshot(); CommitTransactionCommand(); - /* create a map to store parallel task executors */ + /* create a map to store parallel task executors. Persist it in monitor memory context */ + oldContext = MemoryContextSwitchTo(backgroundTaskContext); HTAB *currentExecutors = CreateSimpleHashWithNameAndSize(int64, BackgroundExecutorHashEntry, "Background Executor Hash", MAX_BG_TASK_EXECUTORS); + MemoryContextSwitchTo(oldContext); /* * monitor execution context that is useful during the monitor loop. @@ -861,6 +1011,10 @@ CitusBackgroundTaskQueueMonitorMain(Datum arg) .ctx = backgroundTaskContext }; + /* flag to prevent duplicate termination and cancellation of task executors */ + bool terminateExecutorsStarted = false; + bool cancelExecutorsStarted = false; + /* loop exits if there is no running or runnable tasks left */ bool hasAnyTask = true; while (hasAnyTask) @@ -868,15 +1022,47 @@ CitusBackgroundTaskQueueMonitorMain(Datum arg) /* handle signals */ CHECK_FOR_INTERRUPTS(); + /* + * if the flag is set, we should terminate all task executor workers to prevent duplicate + * runs of the same task on the next start of the monitor, which is dangerous for non-idempotent + * tasks. We do not break the loop here as we want to reflect tasks' messages. Hence, we wait until + * all tasks finish and also do not allow new runnable tasks to start running. After all current tasks + * finish, we can exit the loop safely. + */ + if (GotSigterm && !terminateExecutorsStarted) + { + ereport(LOG, (errmsg("handling termination signal"))); + terminateExecutorsStarted = true; + TerminateAllTaskExecutors(queueMonitorExecutionContext.currentExecutors); + } + + if (GotSigint && !cancelExecutorsStarted) + { + ereport(LOG, (errmsg("handling cancellation signal"))); + cancelExecutorsStarted = true; + CancelAllTaskExecutors(queueMonitorExecutionContext.currentExecutors); + } + + if (GotSighup) + { + GotSighup = false; + + /* update max_background_task_executors if changed */ + ProcessConfigFile(PGC_SIGHUP); + } + /* invalidate cache for new data in catalog */ InvalidateMetadataSystemCache(); - /* assign runnable tasks, if any, to new task executors in a transaction */ - StartTransactionCommand(); - PushActiveSnapshot(GetTransactionSnapshot()); - AssignRunnableTasks(&queueMonitorExecutionContext); - PopActiveSnapshot(); - CommitTransactionCommand(); + /* assign runnable tasks, if any, to new task executors in a transaction if we do not have SIGTERM or SIGINT */ + if (!MonitorGotTerminationOrCancellationRequest()) + { + StartTransactionCommand(); + PushActiveSnapshot(GetTransactionSnapshot()); + AssignRunnableTasks(&queueMonitorExecutionContext); + PopActiveSnapshot(); + CommitTransactionCommand(); + } /* get running task entries from hash table */ List *runningTaskEntries = GetRunningTaskEntries( @@ -1268,7 +1454,8 @@ ConsumeTaskWorkerOutput(shm_mq_handle *responseq, StringInfo message, bool *hadE * environment to the executor. */ static dsm_segment * -StoreArgumentsInDSM(char *database, char *username, char *command, int64 taskId) +StoreArgumentsInDSM(char *database, char *username, char *command, + int64 taskId, int64 jobId) { /* * Create the shared memory that we will pass to the background @@ -1284,6 +1471,7 @@ StoreArgumentsInDSM(char *database, char *username, char *command, int64 taskId) #define QUEUE_SIZE ((Size) 65536) shm_toc_estimate_chunk(&e, QUEUE_SIZE); shm_toc_estimate_chunk(&e, sizeof(int64)); + shm_toc_estimate_chunk(&e, sizeof(int64)); shm_toc_estimate_keys(&e, CITUS_BACKGROUND_TASK_NKEYS); Size segsize = shm_toc_estimate(&e); @@ -1330,6 +1518,10 @@ StoreArgumentsInDSM(char *database, char *username, char *command, int64 taskId) *taskIdTarget = taskId; shm_toc_insert(toc, CITUS_BACKGROUND_TASK_KEY_TASK_ID, taskIdTarget); + int64 *jobIdTarget = shm_toc_allocate(toc, sizeof(int64)); + *jobIdTarget = jobId; + shm_toc_insert(toc, CITUS_BACKGROUND_TASK_KEY_JOB_ID, jobIdTarget); + shm_mq_attach(mq, seg, NULL); return seg; @@ -1343,17 +1535,17 @@ StoreArgumentsInDSM(char *database, char *username, char *command, int64 taskId) * pointer to the dynamic shared memory. */ static BackgroundWorkerHandle * -StartCitusBackgroundTaskExecutor(char *database, char *user, char *command, int64 taskId, - dsm_segment **pSegment) +StartCitusBackgroundTaskExecutor(char *database, char *user, char *command, + int64 taskId, int64 jobId, dsm_segment **pSegment) { - dsm_segment *seg = StoreArgumentsInDSM(database, user, command, taskId); + dsm_segment *seg = StoreArgumentsInDSM(database, user, command, taskId, jobId); /* Configure a worker. */ BackgroundWorker worker = { 0 }; memset(&worker, 0, sizeof(worker)); SafeSnprintf(worker.bgw_name, BGW_MAXLEN, - "Citus Background Task Queue Executor: %s/%s", - database, user); + "Citus Background Task Queue Executor: %s/%s for (%ld/%ld)", + database, user, jobId, taskId); worker.bgw_flags = BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION; worker.bgw_start_time = BgWorkerStart_ConsistentState; @@ -1391,6 +1583,8 @@ typedef struct CitusBackgroundJobExecutorErrorCallbackContext { const char *database; const char *username; + int64 taskId; + int64 jobId; } CitusBackgroundJobExecutorErrorCallbackContext; @@ -1403,8 +1597,9 @@ CitusBackgroundJobExecutorErrorCallback(void *arg) { CitusBackgroundJobExecutorErrorCallbackContext *context = (CitusBackgroundJobExecutorErrorCallbackContext *) arg; - errcontext("Citus Background Task Queue Executor: %s/%s", context->database, - context->username); + errcontext("Citus Background Task Queue Executor: %s/%s for (%ld/%ld)", + context->database, context->username, + context->jobId, context->taskId); } @@ -1418,10 +1613,6 @@ CitusBackgroundJobExecutorErrorCallback(void *arg) void CitusBackgroundTaskExecutor(Datum main_arg) { - /* - * TODO figure out if we need this signal handler that is in pgcron - * pqsignal(SIGTERM, pg_cron_background_worker_sigterm); - */ BackgroundWorkerUnblockSignals(); /* Set up a dynamic shared memory segment. */ @@ -1445,6 +1636,7 @@ CitusBackgroundTaskExecutor(Datum main_arg) char *username = shm_toc_lookup(toc, CITUS_BACKGROUND_TASK_KEY_USERNAME, false); char *command = shm_toc_lookup(toc, CITUS_BACKGROUND_TASK_KEY_COMMAND, false); int64 *taskId = shm_toc_lookup(toc, CITUS_BACKGROUND_TASK_KEY_TASK_ID, false); + int64 *jobId = shm_toc_lookup(toc, CITUS_BACKGROUND_TASK_KEY_JOB_ID, false); shm_mq *mq = shm_toc_lookup(toc, CITUS_BACKGROUND_TASK_KEY_QUEUE, false); shm_mq_set_sender(mq, MyProc); @@ -1456,6 +1648,8 @@ CitusBackgroundTaskExecutor(Datum main_arg) CitusBackgroundJobExecutorErrorCallbackContext context = { .database = database, .username = username, + .taskId = *taskId, + .jobId = *jobId, }; errorCallback.callback = CitusBackgroundJobExecutorErrorCallback; errorCallback.arg = (void *) &context; @@ -1482,8 +1676,8 @@ CitusBackgroundTaskExecutor(Datum main_arg) /* Prepare to execute the query. */ SetCurrentStatementStartTimestamp(); debug_query_string = command; - char *appname = psprintf("citus background task queue executor (taskId %ld)", - *taskId); + char *appname = psprintf("citus background task queue executor (%ld/%ld)", + *jobId, *taskId); pgstat_report_appname(appname); pgstat_report_activity(STATE_RUNNING, command); StartTransactionCommand(); diff --git a/src/include/distributed/background_jobs.h b/src/include/distributed/background_jobs.h index a83928e82..75b7b982b 100644 --- a/src/include/distributed/background_jobs.h +++ b/src/include/distributed/background_jobs.h @@ -28,6 +28,7 @@ typedef struct BackgroundExecutorHashEntry BackgroundWorkerHandle *handle; dsm_segment *seg; + int64 jobid; StringInfo message; } BackgroundExecutorHashEntry; diff --git a/src/test/regress/expected/background_task_queue_monitor.out b/src/test/regress/expected/background_task_queue_monitor.out index 7278eebfc..2d528c50c 100644 --- a/src/test/regress/expected/background_task_queue_monitor.out +++ b/src/test/regress/expected/background_task_queue_monitor.out @@ -125,6 +125,12 @@ SELECT citus_job_cancel(:job_id); (1 row) +SELECT citus_job_wait(:job_id); + citus_job_wait +--------------------------------------------------------------------- + +(1 row) + SELECT state, NOT(started_at IS NULL) AS did_start FROM pg_dist_background_job WHERE job_id = :job_id; state | did_start --------------------------------------------------------------------- @@ -242,6 +248,12 @@ SELECT citus_job_cancel(:job_id2); -- when a job with 1 task is cancelled, the l (1 row) +SELECT citus_job_wait(:job_id2); -- wait for the job to be cancelled + citus_job_wait +--------------------------------------------------------------------- + +(1 row) + SELECT citus_job_wait(:job_id3, desired_status => 'running'); citus_job_wait --------------------------------------------------------------------- @@ -278,12 +290,6 @@ SELECT citus_job_wait(:job_id1); (1 row) -SELECT citus_job_wait(:job_id2); - citus_job_wait ---------------------------------------------------------------------- - -(1 row) - SELECT citus_job_wait(:job_id3); citus_job_wait --------------------------------------------------------------------- @@ -302,11 +308,262 @@ SELECT job_id, task_id, status FROM pg_dist_background_task 9 | 15 | cancelled (5 rows) --- verify that task is not starved by currently long running task +-- verify that a task, previously not started due to lack of workers, is executed after we increase max worker count BEGIN; INSERT INTO pg_dist_background_job (job_type, description) VALUES ('test_job', 'simple test to verify max parallel background execution') RETURNING job_id AS job_id1 \gset -INSERT INTO pg_dist_background_task (job_id, command) VALUES (:job_id1, $job$ SELECT pg_sleep(5000); $job$) RETURNING task_id AS task_id1 \gset +INSERT INTO pg_dist_background_task (job_id, command) VALUES (:job_id1, $job$ SELECT pg_sleep(5); $job$) RETURNING task_id AS task_id1 \gset +INSERT INTO pg_dist_background_task (job_id, command) VALUES (:job_id1, $job$ SELECT pg_sleep(5); $job$) RETURNING task_id AS task_id2 \gset +INSERT INTO pg_dist_background_task (job_id, command) VALUES (:job_id1, $job$ SELECT pg_sleep(5); $job$) RETURNING task_id AS task_id3 \gset INSERT INTO pg_dist_background_job (job_type, description) VALUES ('test_job', 'simple test to verify max parallel background execution') RETURNING job_id AS job_id2 \gset +INSERT INTO pg_dist_background_task (job_id, command) VALUES (:job_id2, $job$ SELECT pg_sleep(5); $job$) RETURNING task_id AS task_id4 \gset +INSERT INTO pg_dist_background_job (job_type, description) VALUES ('test_job', 'simple test to verify max parallel background execution') RETURNING job_id AS job_id3 \gset +INSERT INTO pg_dist_background_task (job_id, command) VALUES (:job_id3, $job$ SELECT pg_sleep(5); $job$) RETURNING task_id AS task_id5 \gset +COMMIT; +SELECT pg_sleep(2); -- we assume this is enough time for all tasks to be in running status except the last one due to parallel worker limit + pg_sleep +--------------------------------------------------------------------- + +(1 row) + +SELECT task_id, status FROM pg_dist_background_task + WHERE task_id IN (:task_id1, :task_id2, :task_id3, :task_id4, :task_id5) + ORDER BY task_id; -- show that last task is not running but ready to run(runnable) + task_id | status +--------------------------------------------------------------------- + 16 | running + 17 | running + 18 | running + 19 | running + 20 | runnable +(5 rows) + +ALTER SYSTEM SET citus.max_background_task_executors TO 5; +SELECT pg_reload_conf(); -- the last runnable task will be running after change + pg_reload_conf +--------------------------------------------------------------------- + t +(1 row) + +SELECT citus_job_wait(:job_id3, desired_status => 'running'); + citus_job_wait +--------------------------------------------------------------------- + +(1 row) + +SELECT task_id, status FROM pg_dist_background_task + WHERE task_id IN (:task_id1, :task_id2, :task_id3, :task_id4, :task_id5) + ORDER BY task_id; -- show that last task is running + task_id | status +--------------------------------------------------------------------- + 16 | running + 17 | running + 18 | running + 19 | running + 20 | running +(5 rows) + +SELECT citus_job_cancel(:job_id1); + citus_job_cancel +--------------------------------------------------------------------- + +(1 row) + +SELECT citus_job_cancel(:job_id2); + citus_job_cancel +--------------------------------------------------------------------- + +(1 row) + +SELECT citus_job_cancel(:job_id3); + citus_job_cancel +--------------------------------------------------------------------- + +(1 row) + +SELECT citus_job_wait(:job_id1); + citus_job_wait +--------------------------------------------------------------------- + +(1 row) + +SELECT citus_job_wait(:job_id2); + citus_job_wait +--------------------------------------------------------------------- + +(1 row) + +SELECT citus_job_wait(:job_id3); + citus_job_wait +--------------------------------------------------------------------- + +(1 row) + +SELECT task_id, status FROM pg_dist_background_task + WHERE task_id IN (:task_id1, :task_id2, :task_id3, :task_id4, :task_id5) + ORDER BY task_id; -- show that all tasks are cancelled + task_id | status +--------------------------------------------------------------------- + 16 | cancelled + 17 | cancelled + 18 | cancelled + 19 | cancelled + 20 | cancelled +(5 rows) + +-- verify that upon termination signal, all tasks fail and retry policy sets their status back to runnable +BEGIN; +INSERT INTO pg_dist_background_job (job_type, description) VALUES ('test_job', 'simple test to verify termination on monitor') RETURNING job_id AS job_id1 \gset +INSERT INTO pg_dist_background_task (job_id, command) VALUES (:job_id1, $job$ SELECT pg_sleep(500); $job$) RETURNING task_id AS task_id1 \gset +INSERT INTO pg_dist_background_job (job_type, description) VALUES ('test_job', 'simple test to verify termination on monitor') RETURNING job_id AS job_id2 \gset +INSERT INTO pg_dist_background_task (job_id, command) VALUES (:job_id2, $job$ SELECT pg_sleep(500); $job$) RETURNING task_id AS task_id2 \gset +COMMIT; +SELECT citus_job_wait(:job_id1, desired_status => 'running'); + citus_job_wait +--------------------------------------------------------------------- + +(1 row) + +SELECT citus_job_wait(:job_id2, desired_status => 'running'); + citus_job_wait +--------------------------------------------------------------------- + +(1 row) + +SELECT task_id, status FROM pg_dist_background_task + WHERE task_id IN (:task_id1, :task_id2) + ORDER BY task_id; + task_id | status +--------------------------------------------------------------------- + 21 | running + 22 | running +(2 rows) + +SELECT pid AS monitor_pid FROM pg_stat_activity WHERE application_name ~ 'task queue monitor' \gset +SELECT pg_terminate_backend(:monitor_pid); -- terminate monitor process + pg_terminate_backend +--------------------------------------------------------------------- + t +(1 row) + +SELECT pg_sleep(2); -- wait enough to show that tasks are terminated + pg_sleep +--------------------------------------------------------------------- + +(1 row) + +SELECT task_id, status, retry_count, message FROM pg_dist_background_task + WHERE task_id IN (:task_id1, :task_id2) + ORDER BY task_id; -- show that all tasks are runnable by retry policy after termination signal + task_id | status | retry_count | message +--------------------------------------------------------------------- + 21 | runnable | 1 | FATAL: terminating background worker "Citus Background Task Queue Executor: regression/postgres for (13/21)" due to administrator command+ + | | | CONTEXT: Citus Background Task Queue Executor: regression/postgres for (13/21) + + | | | + 22 | runnable | 1 | FATAL: terminating background worker "Citus Background Task Queue Executor: regression/postgres for (14/22)" due to administrator command+ + | | | CONTEXT: Citus Background Task Queue Executor: regression/postgres for (14/22) + + | | | +(2 rows) + +SELECT citus_job_cancel(:job_id1); + citus_job_cancel +--------------------------------------------------------------------- + +(1 row) + +SELECT citus_job_cancel(:job_id2); + citus_job_cancel +--------------------------------------------------------------------- + +(1 row) + +SELECT citus_job_wait(:job_id1); + citus_job_wait +--------------------------------------------------------------------- + +(1 row) + +SELECT citus_job_wait(:job_id2); + citus_job_wait +--------------------------------------------------------------------- + +(1 row) + +SELECT task_id, status FROM pg_dist_background_task + WHERE task_id IN (:task_id1, :task_id2) + ORDER BY task_id; -- show that all tasks are cancelled + task_id | status +--------------------------------------------------------------------- + 21 | cancelled + 22 | cancelled +(2 rows) + +-- verify that upon cancellation signal, all tasks are cancelled +BEGIN; +INSERT INTO pg_dist_background_job (job_type, description) VALUES ('test_job', 'simple test to verify cancellation on monitor') RETURNING job_id AS job_id1 \gset +INSERT INTO pg_dist_background_task (job_id, command) VALUES (:job_id1, $job$ SELECT pg_sleep(500); $job$) RETURNING task_id AS task_id1 \gset +INSERT INTO pg_dist_background_job (job_type, description) VALUES ('test_job', 'simple test to verify cancellation on monitor') RETURNING job_id AS job_id2 \gset +INSERT INTO pg_dist_background_task (job_id, command) VALUES (:job_id2, $job$ SELECT pg_sleep(500); $job$) RETURNING task_id AS task_id2 \gset +COMMIT; +SELECT citus_job_wait(:job_id1, desired_status => 'running'); + citus_job_wait +--------------------------------------------------------------------- + +(1 row) + +SELECT citus_job_wait(:job_id2, desired_status => 'running'); + citus_job_wait +--------------------------------------------------------------------- + +(1 row) + +SELECT task_id, status FROM pg_dist_background_task + WHERE task_id IN (:task_id1, :task_id2) + ORDER BY task_id; + task_id | status +--------------------------------------------------------------------- + 23 | running + 24 | running +(2 rows) + +SELECT pid AS monitor_pid FROM pg_stat_activity WHERE application_name ~ 'task queue monitor' \gset +SELECT pg_cancel_backend(:monitor_pid); -- cancel monitor process + pg_cancel_backend +--------------------------------------------------------------------- + t +(1 row) + +SELECT pg_sleep(2); -- wait enough to show that tasks are cancelled + pg_sleep +--------------------------------------------------------------------- + +(1 row) + +SELECT citus_job_wait(:job_id1); + citus_job_wait +--------------------------------------------------------------------- + +(1 row) + +SELECT citus_job_wait(:job_id2); + citus_job_wait +--------------------------------------------------------------------- + +(1 row) + +SELECT task_id, status FROM pg_dist_background_task + WHERE task_id IN (:task_id1, :task_id2) + ORDER BY task_id; -- show that all tasks are cancelled + task_id | status +--------------------------------------------------------------------- + 23 | cancelled + 24 | cancelled +(2 rows) + +-- verify that task is not starved by currently long running task +BEGIN; +INSERT INTO pg_dist_background_job (job_type, description) VALUES ('test_job', 'simple test to verify task execution starvation') RETURNING job_id AS job_id1 \gset +INSERT INTO pg_dist_background_task (job_id, command) VALUES (:job_id1, $job$ SELECT pg_sleep(5000); $job$) RETURNING task_id AS task_id1 \gset +INSERT INTO pg_dist_background_job (job_type, description) VALUES ('test_job', 'simple test to verify task execution starvation') RETURNING job_id AS job_id2 \gset INSERT INTO pg_dist_background_task (job_id, command) VALUES (:job_id2, $job$ SELECT 1; $job$) RETURNING task_id AS task_id2 \gset COMMIT; SELECT citus_job_wait(:job_id1, desired_status => 'running'); @@ -326,8 +583,8 @@ SELECT job_id, task_id, status FROM pg_dist_background_task ORDER BY job_id, task_id; -- show that last task is finished without starvation job_id | task_id | status --------------------------------------------------------------------- - 10 | 16 | running - 11 | 17 | done + 17 | 25 | running + 18 | 26 | done (2 rows) SELECT citus_job_cancel(:job_id1); @@ -336,6 +593,21 @@ SELECT citus_job_cancel(:job_id1); (1 row) +SELECT citus_job_wait(:job_id1); + citus_job_wait +--------------------------------------------------------------------- + +(1 row) + +SELECT job_id, task_id, status FROM pg_dist_background_task + WHERE task_id IN (:task_id1, :task_id2) + ORDER BY job_id, task_id; -- show that task is cancelled + job_id | task_id | status +--------------------------------------------------------------------- + 17 | 25 | cancelled + 18 | 26 | done +(2 rows) + SET client_min_messages TO WARNING; DROP SCHEMA background_task_queue_monitor CASCADE; ALTER SYSTEM RESET citus.background_task_queue_interval; diff --git a/src/test/regress/sql/background_task_queue_monitor.sql b/src/test/regress/sql/background_task_queue_monitor.sql index 4319f2bf8..c04fe90d6 100644 --- a/src/test/regress/sql/background_task_queue_monitor.sql +++ b/src/test/regress/sql/background_task_queue_monitor.sql @@ -54,6 +54,7 @@ SELECT status, pid, retry_count, NOT(message = '') AS has_message, (not_before > -- test cancelling a failed/retrying job SELECT citus_job_cancel(:job_id); +SELECT citus_job_wait(:job_id); SELECT state, NOT(started_at IS NULL) AS did_start FROM pg_dist_background_job WHERE job_id = :job_id; SELECT status, NOT(message = '') AS did_start FROM pg_dist_background_task WHERE job_id = :job_id ORDER BY task_id ASC; @@ -110,6 +111,7 @@ SELECT job_id, task_id, status FROM pg_dist_background_task ORDER BY job_id, task_id; -- show that last task is not running but ready to run(runnable) SELECT citus_job_cancel(:job_id2); -- when a job with 1 task is cancelled, the last runnable task will be running +SELECT citus_job_wait(:job_id2); -- wait for the job to be cancelled SELECT citus_job_wait(:job_id3, desired_status => 'running'); SELECT job_id, task_id, status FROM pg_dist_background_task WHERE task_id IN (:task_id1, :task_id2, :task_id3, :task_id4, :task_id5) @@ -118,18 +120,115 @@ SELECT job_id, task_id, status FROM pg_dist_background_task SELECT citus_job_cancel(:job_id1); SELECT citus_job_cancel(:job_id3); SELECT citus_job_wait(:job_id1); -SELECT citus_job_wait(:job_id2); SELECT citus_job_wait(:job_id3); SELECT job_id, task_id, status FROM pg_dist_background_task WHERE task_id IN (:task_id1, :task_id2, :task_id3, :task_id4, :task_id5) ORDER BY job_id, task_id; -- show that multiple cancels worked --- verify that task is not starved by currently long running task +-- verify that a task, previously not started due to lack of workers, is executed after we increase max worker count BEGIN; INSERT INTO pg_dist_background_job (job_type, description) VALUES ('test_job', 'simple test to verify max parallel background execution') RETURNING job_id AS job_id1 \gset -INSERT INTO pg_dist_background_task (job_id, command) VALUES (:job_id1, $job$ SELECT pg_sleep(5000); $job$) RETURNING task_id AS task_id1 \gset +INSERT INTO pg_dist_background_task (job_id, command) VALUES (:job_id1, $job$ SELECT pg_sleep(5); $job$) RETURNING task_id AS task_id1 \gset +INSERT INTO pg_dist_background_task (job_id, command) VALUES (:job_id1, $job$ SELECT pg_sleep(5); $job$) RETURNING task_id AS task_id2 \gset +INSERT INTO pg_dist_background_task (job_id, command) VALUES (:job_id1, $job$ SELECT pg_sleep(5); $job$) RETURNING task_id AS task_id3 \gset INSERT INTO pg_dist_background_job (job_type, description) VALUES ('test_job', 'simple test to verify max parallel background execution') RETURNING job_id AS job_id2 \gset +INSERT INTO pg_dist_background_task (job_id, command) VALUES (:job_id2, $job$ SELECT pg_sleep(5); $job$) RETURNING task_id AS task_id4 \gset +INSERT INTO pg_dist_background_job (job_type, description) VALUES ('test_job', 'simple test to verify max parallel background execution') RETURNING job_id AS job_id3 \gset +INSERT INTO pg_dist_background_task (job_id, command) VALUES (:job_id3, $job$ SELECT pg_sleep(5); $job$) RETURNING task_id AS task_id5 \gset +COMMIT; + +SELECT pg_sleep(2); -- we assume this is enough time for all tasks to be in running status except the last one due to parallel worker limit + +SELECT task_id, status FROM pg_dist_background_task + WHERE task_id IN (:task_id1, :task_id2, :task_id3, :task_id4, :task_id5) + ORDER BY task_id; -- show that last task is not running but ready to run(runnable) + +ALTER SYSTEM SET citus.max_background_task_executors TO 5; +SELECT pg_reload_conf(); -- the last runnable task will be running after change +SELECT citus_job_wait(:job_id3, desired_status => 'running'); +SELECT task_id, status FROM pg_dist_background_task + WHERE task_id IN (:task_id1, :task_id2, :task_id3, :task_id4, :task_id5) + ORDER BY task_id; -- show that last task is running + +SELECT citus_job_cancel(:job_id1); +SELECT citus_job_cancel(:job_id2); +SELECT citus_job_cancel(:job_id3); +SELECT citus_job_wait(:job_id1); +SELECT citus_job_wait(:job_id2); +SELECT citus_job_wait(:job_id3); + +SELECT task_id, status FROM pg_dist_background_task + WHERE task_id IN (:task_id1, :task_id2, :task_id3, :task_id4, :task_id5) + ORDER BY task_id; -- show that all tasks are cancelled + +-- verify that upon termination signal, all tasks fail and retry policy sets their status back to runnable +BEGIN; +INSERT INTO pg_dist_background_job (job_type, description) VALUES ('test_job', 'simple test to verify termination on monitor') RETURNING job_id AS job_id1 \gset +INSERT INTO pg_dist_background_task (job_id, command) VALUES (:job_id1, $job$ SELECT pg_sleep(500); $job$) RETURNING task_id AS task_id1 \gset +INSERT INTO pg_dist_background_job (job_type, description) VALUES ('test_job', 'simple test to verify termination on monitor') RETURNING job_id AS job_id2 \gset +INSERT INTO pg_dist_background_task (job_id, command) VALUES (:job_id2, $job$ SELECT pg_sleep(500); $job$) RETURNING task_id AS task_id2 \gset +COMMIT; + +SELECT citus_job_wait(:job_id1, desired_status => 'running'); +SELECT citus_job_wait(:job_id2, desired_status => 'running'); + +SELECT task_id, status FROM pg_dist_background_task + WHERE task_id IN (:task_id1, :task_id2) + ORDER BY task_id; + + +SELECT pid AS monitor_pid FROM pg_stat_activity WHERE application_name ~ 'task queue monitor' \gset +SELECT pg_terminate_backend(:monitor_pid); -- terminate monitor process + +SELECT pg_sleep(2); -- wait enough to show that tasks are terminated + +SELECT task_id, status, retry_count, message FROM pg_dist_background_task + WHERE task_id IN (:task_id1, :task_id2) + ORDER BY task_id; -- show that all tasks are runnable by retry policy after termination signal + +SELECT citus_job_cancel(:job_id1); +SELECT citus_job_cancel(:job_id2); +SELECT citus_job_wait(:job_id1); +SELECT citus_job_wait(:job_id2); + +SELECT task_id, status FROM pg_dist_background_task + WHERE task_id IN (:task_id1, :task_id2) + ORDER BY task_id; -- show that all tasks are cancelled + +-- verify that upon cancellation signal, all tasks are cancelled +BEGIN; +INSERT INTO pg_dist_background_job (job_type, description) VALUES ('test_job', 'simple test to verify cancellation on monitor') RETURNING job_id AS job_id1 \gset +INSERT INTO pg_dist_background_task (job_id, command) VALUES (:job_id1, $job$ SELECT pg_sleep(500); $job$) RETURNING task_id AS task_id1 \gset +INSERT INTO pg_dist_background_job (job_type, description) VALUES ('test_job', 'simple test to verify cancellation on monitor') RETURNING job_id AS job_id2 \gset +INSERT INTO pg_dist_background_task (job_id, command) VALUES (:job_id2, $job$ SELECT pg_sleep(500); $job$) RETURNING task_id AS task_id2 \gset +COMMIT; + +SELECT citus_job_wait(:job_id1, desired_status => 'running'); +SELECT citus_job_wait(:job_id2, desired_status => 'running'); + +SELECT task_id, status FROM pg_dist_background_task + WHERE task_id IN (:task_id1, :task_id2) + ORDER BY task_id; + + +SELECT pid AS monitor_pid FROM pg_stat_activity WHERE application_name ~ 'task queue monitor' \gset +SELECT pg_cancel_backend(:monitor_pid); -- cancel monitor process + +SELECT pg_sleep(2); -- wait enough to show that tasks are cancelled + +SELECT citus_job_wait(:job_id1); +SELECT citus_job_wait(:job_id2); + +SELECT task_id, status FROM pg_dist_background_task + WHERE task_id IN (:task_id1, :task_id2) + ORDER BY task_id; -- show that all tasks are cancelled + +-- verify that task is not starved by currently long running task +BEGIN; +INSERT INTO pg_dist_background_job (job_type, description) VALUES ('test_job', 'simple test to verify task execution starvation') RETURNING job_id AS job_id1 \gset +INSERT INTO pg_dist_background_task (job_id, command) VALUES (:job_id1, $job$ SELECT pg_sleep(5000); $job$) RETURNING task_id AS task_id1 \gset +INSERT INTO pg_dist_background_job (job_type, description) VALUES ('test_job', 'simple test to verify task execution starvation') RETURNING job_id AS job_id2 \gset INSERT INTO pg_dist_background_task (job_id, command) VALUES (:job_id2, $job$ SELECT 1; $job$) RETURNING task_id AS task_id2 \gset COMMIT; @@ -139,6 +238,11 @@ SELECT job_id, task_id, status FROM pg_dist_background_task WHERE task_id IN (:task_id1, :task_id2) ORDER BY job_id, task_id; -- show that last task is finished without starvation SELECT citus_job_cancel(:job_id1); +SELECT citus_job_wait(:job_id1); + +SELECT job_id, task_id, status FROM pg_dist_background_task + WHERE task_id IN (:task_id1, :task_id2) + ORDER BY job_id, task_id; -- show that task is cancelled SET client_min_messages TO WARNING; DROP SCHEMA background_task_queue_monitor CASCADE;