diff --git a/.circleci/config.yml b/.circleci/config.yml index 2ee7bbbca..f6d32c337 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -167,8 +167,9 @@ jobs: name: 'Copy coredumps' command: | mkdir -p /tmp/core_dumps - if ls core.* 1> /dev/null 2>&1; then - cp core.* /tmp/core_dumps + core_files=( $(find . -type f -regex .*core.*\d*.*postgres) ) + if [ ${#core_files[@]} -gt 0 ]; then + cp "${core_files[@]}" /tmp/core_dumps fi when: on_fail - run: @@ -258,8 +259,9 @@ jobs: name: 'Copy coredumps' command: | mkdir -p /tmp/core_dumps - if ls core.* 1> /dev/null 2>&1; then - cp core.* /tmp/core_dumps + core_files=( $(find . -type f -regex .*core.*\d*.*postgres) ) + if [ ${#core_files[@]} -gt 0 ]; then + cp "${core_files[@]}" /tmp/core_dumps fi when: on_fail - store_artifacts: @@ -339,8 +341,9 @@ jobs: name: 'Copy coredumps' command: | mkdir -p /tmp/core_dumps - if ls core.* 1> /dev/null 2>&1; then - cp core.* /tmp/core_dumps + core_files=( $(find . -type f -regex .*core.*\d*.*postgres) ) + if [ ${#core_files[@]} -gt 0 ]; then + cp "${core_files[@]}" /tmp/core_dumps fi when: on_fail - store_artifacts: @@ -405,8 +408,9 @@ jobs: name: 'Copy coredumps' command: | mkdir -p /tmp/core_dumps - if ls core.* 1> /dev/null 2>&1; then - cp core.* /tmp/core_dumps + core_files=( $(find . -type f -regex .*core.*\d*.*postgres) ) + if [ ${#core_files[@]} -gt 0 ]; then + cp "${core_files[@]}" /tmp/core_dumps fi when: on_fail - store_artifacts: @@ -483,8 +487,9 @@ jobs: name: 'Copy coredumps' command: | mkdir -p /tmp/core_dumps - if ls core.* 1> /dev/null 2>&1; then - cp core.* /tmp/core_dumps + core_files=( $(find . -type f -regex .*core.*\d*.*postgres) ) + if [ ${#core_files[@]} -gt 0 ]; then + cp "${core_files[@]}" /tmp/core_dumps fi when: on_fail - store_artifacts: @@ -627,8 +632,9 @@ jobs: name: 'Copy coredumps' command: | mkdir -p /tmp/core_dumps - if ls core.* 1> /dev/null 2>&1; then - cp core.* /tmp/core_dumps + core_files=( $(find . -type f -regex .*core.*\d*.*postgres) ) + if [ ${#core_files[@]} -gt 0 ]; then + cp "${core_files[@]}" /tmp/core_dumps fi when: on_fail - store_artifacts: diff --git a/.github/packaging/packaging_ignore.yml b/.github/packaging/packaging_ignore.yml new file mode 100644 index 000000000..c46f28f5e --- /dev/null +++ b/.github/packaging/packaging_ignore.yml @@ -0,0 +1,3 @@ +base: + - ".* warning: ignoring old recipe for target [`']check'" + - ".* warning: overriding recipe for target [`']check'" diff --git a/.github/packaging/validate_build_output.sh b/.github/packaging/validate_build_output.sh new file mode 100755 index 000000000..4ae588574 --- /dev/null +++ b/.github/packaging/validate_build_output.sh @@ -0,0 +1,6 @@ +package_type=${1} +git clone -b v0.8.23 --depth=1 https://github.com/citusdata/tools.git tools +python3 -m pip install -r tools/packaging_automation/requirements.txt +python3 -m tools.packaging_automation.validate_build_output --output_file output.log \ + --ignore_file .github/packaging/packaging_ignore.yml \ + --package_type ${package_type} diff --git a/.github/workflows/packaging-test-pipelines.yml b/.github/workflows/packaging-test-pipelines.yml new file mode 100644 index 000000000..f79619b21 --- /dev/null +++ b/.github/workflows/packaging-test-pipelines.yml @@ -0,0 +1,158 @@ +name: Build tests in packaging images + +on: + push: + branches: "**" + + workflow_dispatch: + +jobs: + + get_postgres_versions_from_file: + runs-on: ubuntu-latest + outputs: + pg_versions: ${{ steps.get-postgres-versions.outputs.pg_versions }} + steps: + - name: Checkout + uses: actions/checkout@v3 + with: + fetch-depth: 2 + - name: Get Postgres Versions + id: get-postgres-versions + run: | + # Postgres versions are stored in .circleci/config.yml file in "build-[pg-version] format. Below command + # extracts the versions and get the unique values. + pg_versions=`grep -Eo 'build-[[:digit:]]{2}' .circleci/config.yml|sed -e "s/^build-//"|sort|uniq|tr '\n' ','| head -c -1` + pg_versions_array="[ ${pg_versions} ]" + echo "Supported PG Versions: ${pg_versions_array}" + # Below line is needed to set the output variable to be used in the next job + echo "pg_versions=${pg_versions_array}" >> $GITHUB_OUTPUT + + rpm_build_tests: + name: rpm_build_tests + needs: get_postgres_versions_from_file + runs-on: ubuntu-latest + strategy: + fail-fast: false + matrix: + # While we use separate images for different Postgres versions in rpm + # based distros + # For this reason, we need to use a "matrix" to generate names of + # rpm images, e.g. citus/packaging:centos-7-pg12 + packaging_docker_image: + - oraclelinux-7 + - oraclelinux-8 + - centos-7 + - centos-8 + - almalinux-9 + POSTGRES_VERSION: ${{ fromJson(needs.get_postgres_versions_from_file.outputs.pg_versions) }} + + container: + image: citus/packaging:${{ matrix.packaging_docker_image }}-pg${{ matrix.POSTGRES_VERSION }} + + steps: + - name: Checkout repository + uses: actions/checkout@v3 + + - name: Add Postgres installation directory into PATH for rpm based distros + run: | + echo "/usr/pgsql-${{ matrix.POSTGRES_VERSION }}/bin" >> $GITHUB_PATH + + - name: Configure + run: | + echo "Current Shell:$0" + echo "GCC Version: $(gcc --version)" + ./configure 2>&1 | tee output.log + + - name: Make clean + run: | + make clean + + - name: Make + run: | + make CFLAGS="-Wno-missing-braces" -sj$(cat /proc/cpuinfo | grep "core id" | wc -l) 2>&1 | tee -a output.log + + - name: Make install + run: | + make CFLAGS="-Wno-missing-braces" install 2>&1 | tee -a output.log + + - name: Validate output + env: + POSTGRES_VERSION: ${{ matrix.POSTGRES_VERSION }} + PACKAGING_DOCKER_IMAGE: ${{ matrix.packaging_docker_image }} + run: | + echo "Postgres version: ${POSTGRES_VERSION}" + + ## Install required packages to execute packaging tools for rpm based distros + yum install python3-pip python3-devel postgresql-devel -y + python3 -m pip install wheel + + ./.github/packaging/validate_build_output.sh "rpm" + + deb_build_tests: + name: deb_build_tests + needs: get_postgres_versions_from_file + runs-on: ubuntu-latest + strategy: + fail-fast: false + matrix: + # On deb based distros, we use the same docker image for + # builds based on different Postgres versions because deb + # based images include all postgres installations. + # For this reason, we have multiple runs --which is 3 today-- + # for each deb based image and we use POSTGRES_VERSION to set + # PG_CONFIG variable in each of those runs. + packaging_docker_image: + - debian-buster-all + - debian-bookworm-all + - debian-bullseye-all + - ubuntu-bionic-all + - ubuntu-focal-all + - ubuntu-jammy-all + - ubuntu-kinetic-all + + POSTGRES_VERSION: ${{ fromJson(needs.get_postgres_versions_from_file.outputs.pg_versions) }} + + container: + image: citus/packaging:${{ matrix.packaging_docker_image }} + + steps: + - name: Checkout repository + uses: actions/checkout@v3 + + - name: Set pg_config path to related Postgres version + run: | + echo "PG_CONFIG=/usr/lib/postgresql/${{ matrix.POSTGRES_VERSION }}/bin/pg_config" >> $GITHUB_ENV + + - name: Configure + run: | + echo "Current Shell:$0" + echo "GCC Version: $(gcc --version)" + ./configure 2>&1 | tee output.log + + - name: Make clean + run: | + make clean + + - name: Make + run: | + make -sj$(cat /proc/cpuinfo | grep "core id" | wc -l) 2>&1 | tee -a output.log + + - name: Make install + run: | + make install 2>&1 | tee -a output.log + + - name: Validate output + env: + POSTGRES_VERSION: ${{ matrix.POSTGRES_VERSION }} + PACKAGING_DOCKER_IMAGE: ${{ matrix.packaging_docker_image }} + run: | + echo "Postgres version: ${POSTGRES_VERSION}" + + apt-get update -y + ## Install required packages to execute packaging tools for deb based distros + apt install python3-dev python3-pip -y + sudo apt-get purge -y python3-yaml + python3 -m pip install --upgrade pip setuptools==57.5.0 + + ./.github/packaging/validate_build_output.sh "deb" diff --git a/src/backend/columnar/README.md b/src/backend/columnar/README.md index e8681e0f3..ca3532dba 100644 --- a/src/backend/columnar/README.md +++ b/src/backend/columnar/README.md @@ -234,16 +234,14 @@ CREATE TABLE perf_columnar(LIKE perf_row) USING COLUMNAR; ## Data ```sql -CREATE OR REPLACE FUNCTION random_words(n INT4) RETURNS TEXT LANGUAGE plpython2u AS $$ -import random -t = '' -words = ['zero','one','two','three','four','five','six','seven','eight','nine','ten'] -for i in xrange(0,n): - if (i != 0): - t += ' ' - r = random.randint(0,len(words)-1) - t += words[r] -return t +CREATE OR REPLACE FUNCTION random_words(n INT4) RETURNS TEXT LANGUAGE sql AS $$ + WITH words(w) AS ( + SELECT ARRAY['zero','one','two','three','four','five','six','seven','eight','nine','ten'] + ), + random (word) AS ( + SELECT w[(random()*array_length(w, 1))::int] FROM generate_series(1, $1) AS i, words + ) + SELECT string_agg(word, ' ') FROM random; $$; ``` diff --git a/src/backend/distributed/commands/distribute_object_ops.c b/src/backend/distributed/commands/distribute_object_ops.c index 056ba20e2..3f30eaaa2 100644 --- a/src/backend/distributed/commands/distribute_object_ops.c +++ b/src/backend/distributed/commands/distribute_object_ops.c @@ -254,6 +254,15 @@ static DistributeObjectOps Any_CreateRole = { .address = CreateRoleStmtObjectAddress, .markDistributed = true, }; +static DistributeObjectOps Any_DropOwned = { + .deparse = DeparseDropOwnedStmt, + .qualify = NULL, + .preprocess = PreprocessDropOwnedStmt, + .postprocess = NULL, + .operationType = DIST_OPS_DROP, + .address = NULL, + .markDistributed = false, +}; static DistributeObjectOps Any_DropRole = { .deparse = DeparseDropRoleStmt, .qualify = NULL, @@ -1658,6 +1667,11 @@ GetDistributeObjectOps(Node *node) return &Any_DropRole; } + case T_DropOwnedStmt: + { + return &Any_DropOwned; + } + case T_DropStmt: { DropStmt *stmt = castNode(DropStmt, node); diff --git a/src/backend/distributed/commands/owned.c b/src/backend/distributed/commands/owned.c new file mode 100644 index 000000000..c8f6a4bbe --- /dev/null +++ b/src/backend/distributed/commands/owned.c @@ -0,0 +1,90 @@ +/*------------------------------------------------------------------------- + * + * owned.c + * Commands for DROP OWNED statements. + * + * Copyright (c) Citus Data, Inc. + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "access/heapam.h" +#include "access/htup_details.h" +#include "access/genam.h" +#include "access/table.h" +#include "access/xact.h" +#include "catalog/catalog.h" +#include "catalog/pg_auth_members.h" +#include "catalog/pg_authid.h" +#include "catalog/pg_db_role_setting.h" +#include "catalog/pg_type.h" +#include "catalog/objectaddress.h" +#include "commands/dbcommands.h" +#include "distributed/citus_ruleutils.h" +#include "distributed/citus_safe_lib.h" +#include "distributed/commands.h" +#include "distributed/commands/utility_hook.h" +#include "distributed/deparser.h" +#include "distributed/listutils.h" +#include "distributed/coordinator_protocol.h" +#include "distributed/metadata/distobject.h" +#include "distributed/metadata_sync.h" +#include "distributed/metadata/distobject.h" +#include "distributed/multi_executor.h" +#include "distributed/relation_access_tracking.h" +#include "distributed/version_compat.h" +#include "distributed/worker_transaction.h" +#include "miscadmin.h" +#include "nodes/makefuncs.h" +#include "nodes/parsenodes.h" +#include "nodes/pg_list.h" +#include "parser/scansup.h" +#include "utils/builtins.h" +#include "utils/fmgroids.h" +#include "utils/rel.h" +#include "utils/varlena.h" +#include "utils/syscache.h" + +/* + * PreprocessDropOwnedStmt finds the distributed role out of the ones + * being dropped and unmarks them distributed and creates the drop statements + * for the workers. + */ +List * +PreprocessDropOwnedStmt(Node *node, const char *queryString, + ProcessUtilityContext processUtilityContext) +{ + DropOwnedStmt *stmt = castNode(DropOwnedStmt, node); + List *allDropRoles = stmt->roles; + + List *distributedDropRoles = FilterDistributedRoles(allDropRoles); + if (list_length(distributedDropRoles) <= 0) + { + return NIL; + } + + if (!ShouldPropagate()) + { + return NIL; + } + + /* check creation against multi-statement transaction policy */ + if (!ShouldPropagateCreateInCoordinatedTransction()) + { + return NIL; + } + + EnsureCoordinator(); + + stmt->roles = distributedDropRoles; + char *sql = DeparseTreeNode((Node *) stmt); + stmt->roles = allDropRoles; + + List *commands = list_make3(DISABLE_DDL_PROPAGATION, + sql, + ENABLE_DDL_PROPAGATION); + + return NodeDDLTaskList(NON_COORDINATOR_NODES, commands); +} diff --git a/src/backend/distributed/commands/table.c b/src/backend/distributed/commands/table.c index 1be61cbe2..8101dc964 100644 --- a/src/backend/distributed/commands/table.c +++ b/src/backend/distributed/commands/table.c @@ -2992,6 +2992,9 @@ ErrorIfUnsupportedAlterTableStmt(AlterTableStmt *alterTableStatement) break; } +#if PG_VERSION_NUM >= PG_VERSION_15 + case AT_SetAccessMethod: +#endif case AT_SetNotNull: case AT_ReplicaIdentity: case AT_ChangeOwner: @@ -3007,6 +3010,7 @@ ErrorIfUnsupportedAlterTableStmt(AlterTableStmt *alterTableStatement) { /* * We will not perform any special check for: + * ALTER TABLE .. SET ACCESS METHOD .. * ALTER TABLE .. ALTER COLUMN .. SET NOT NULL * ALTER TABLE .. REPLICA IDENTITY .. * ALTER TABLE .. VALIDATE CONSTRAINT .. diff --git a/src/backend/distributed/deparser/deparse_owned_stmts.c b/src/backend/distributed/deparser/deparse_owned_stmts.c new file mode 100644 index 000000000..888071165 --- /dev/null +++ b/src/backend/distributed/deparser/deparse_owned_stmts.c @@ -0,0 +1,84 @@ +/*------------------------------------------------------------------------- + * + * deparse_owned_stmts.c + * Functions to turn all Statement structures related to owned back + * into sql. + * + * Copyright (c), Citus Data, Inc. + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "pg_version_compat.h" + +#include "distributed/citus_ruleutils.h" +#include "distributed/deparser.h" +#include "lib/stringinfo.h" +#include "nodes/parsenodes.h" +#include "utils/builtins.h" + +static void AppendDropOwnedStmt(StringInfo buf, DropOwnedStmt *stmt); +static void AppendRoleList(StringInfo buf, List *roleList); + +/* + * DeparseDropOwnedStmt builds and returns a string representing of the + * DropOwnedStmt for application on a remote server. + */ +char * +DeparseDropOwnedStmt(Node *node) +{ + DropOwnedStmt *stmt = castNode(DropOwnedStmt, node); + + StringInfoData buf = { 0 }; + initStringInfo(&buf); + + AppendDropOwnedStmt(&buf, stmt); + + return buf.data; +} + + +/* + * AppendDropOwnedStmt generates the string representation of the + * DropOwnedStmt and appends it to the buffer. + */ +static void +AppendDropOwnedStmt(StringInfo buf, DropOwnedStmt *stmt) +{ + appendStringInfo(buf, "DROP OWNED BY "); + + AppendRoleList(buf, stmt->roles); + + if (stmt->behavior == DROP_RESTRICT) + { + appendStringInfo(buf, " RESTRICT"); + } + else if (stmt->behavior == DROP_CASCADE) + { + appendStringInfo(buf, " CASCADE"); + } +} + + +static void +AppendRoleList(StringInfo buf, List *roleList) +{ + ListCell *cell = NULL; + foreach(cell, roleList) + { + Node *roleNode = (Node *) lfirst(cell); + Assert(IsA(roleNode, RoleSpec) || IsA(roleNode, AccessPriv)); + char const *rolename = NULL; + if (IsA(roleNode, RoleSpec)) + { + rolename = RoleSpecString((RoleSpec *) roleNode, true); + } + appendStringInfoString(buf, rolename); + if (cell != list_tail(roleList)) + { + appendStringInfo(buf, ", "); + } + } +} diff --git a/src/backend/distributed/metadata/node_metadata.c b/src/backend/distributed/metadata/node_metadata.c index edd72e71f..d133c50b2 100644 --- a/src/backend/distributed/metadata/node_metadata.c +++ b/src/backend/distributed/metadata/node_metadata.c @@ -2232,7 +2232,7 @@ SetWorkerColumnOptional(WorkerNode *workerNode, int columnIndex, Datum value) { /* metadata out of sync, mark the worker as not synced */ ereport(WARNING, (errmsg("Updating the metadata of the node %s:%d " - "is failed on node %s:%d." + "is failed on node %s:%d. " "Metadata on %s:%d is marked as out of sync.", workerNode->workerName, workerNode->workerPort, worker->workerName, worker->workerPort, diff --git a/src/backend/distributed/operations/shard_transfer.c b/src/backend/distributed/operations/shard_transfer.c index 9e777f2a1..4c20c0433 100644 --- a/src/backend/distributed/operations/shard_transfer.c +++ b/src/backend/distributed/operations/shard_transfer.c @@ -553,7 +553,7 @@ CheckSpaceConstraints(MultiConnection *connection, uint64 colocationSizeInBytes) { ereport(ERROR, (errmsg("not enough empty space on node if the shard is moved, " "actual available space after move will be %ld bytes, " - "desired available space after move is %ld bytes," + "desired available space after move is %ld bytes, " "estimated size increase on node after move is %ld bytes.", diskAvailableInBytesAfterShardMove, desiredNewDiskAvailableInBytes, colocationSizeInBytes), diff --git a/src/backend/distributed/replication/multi_logical_replication.c b/src/backend/distributed/replication/multi_logical_replication.c index 051c88735..0157bb545 100644 --- a/src/backend/distributed/replication/multi_logical_replication.c +++ b/src/backend/distributed/replication/multi_logical_replication.c @@ -604,7 +604,7 @@ CreateReplicaIdentitiesOnNode(List *shardList, char *nodeName, int32 nodePort) if (commandList != NIL) { - ereport(DEBUG1, (errmsg("Creating replica identity for shard %ld on" + ereport(DEBUG1, (errmsg("Creating replica identity for shard %ld on " "target node %s:%d", shardId, nodeName, nodePort))); SendCommandListToWorkerOutsideTransaction(nodeName, nodePort, diff --git a/src/backend/distributed/shared_library_init.c b/src/backend/distributed/shared_library_init.c index 81ad43f38..c15268056 100644 --- a/src/backend/distributed/shared_library_init.c +++ b/src/backend/distributed/shared_library_init.c @@ -1054,8 +1054,8 @@ RegisterCitusConfigVariables(void) gettext_noop( "Sets how many percentage of free disk space should be after a shard move"), gettext_noop( - "This setting controls how much free space should be available after a shard move." - "If the free disk space will be lower than this parameter, then shard move will result in" + "This setting controls how much free space should be available after a shard move. " + "If the free disk space will be lower than this parameter, then shard move will result in " "an error."), &DesiredPercentFreeAfterMove, 10.0, 0.0, 100.0, @@ -1448,7 +1448,7 @@ RegisterCitusConfigVariables(void) "parallelization"), gettext_noop("When enabled, Citus will force the executor to use " "as many connections as possible while executing a " - "parallel distributed query. If not enabled, the executor" + "parallel distributed query. If not enabled, the executor " "might choose to use less connections to optimize overall " "query execution throughput. Internally, setting this true " "will end up with using one connection per task."), @@ -1487,7 +1487,7 @@ RegisterCitusConfigVariables(void) DefineCustomBoolVariable( "citus.hide_citus_dependent_objects", gettext_noop( - "Hides some objects, which depends on citus extension, from pg meta class queries." + "Hides some objects, which depends on citus extension, from pg meta class queries. " "It is intended to be used only before postgres vanilla tests to not break them."), NULL, &HideCitusDependentObjects, @@ -1594,10 +1594,10 @@ RegisterCitusConfigVariables(void) gettext_noop("defines the behaviour when a distributed table " "is joined with a local table"), gettext_noop( - "There are 4 values available. The default, 'auto' will recursively plan" - "distributed tables if there is a constant filter on a unique index." - "'prefer-local' will choose local tables if possible." - "'prefer-distributed' will choose distributed tables if possible" + "There are 4 values available. The default, 'auto' will recursively plan " + "distributed tables if there is a constant filter on a unique index. " + "'prefer-local' will choose local tables if possible. " + "'prefer-distributed' will choose distributed tables if possible. " "'never' will basically skip local table joins." ), &LocalTableJoinPolicy, @@ -1816,11 +1816,11 @@ RegisterCitusConfigVariables(void) "health status are tracked in a shared hash table on " "the master node. This configuration value limits the " "size of the hash table, and consequently the maximum " - "number of worker nodes that can be tracked." + "number of worker nodes that can be tracked. " "Citus keeps some information about the worker nodes " "in the shared memory for certain optimizations. The " "optimizations are enforced up to this number of worker " - "nodes. Any additional worker nodes may not benefit from" + "nodes. Any additional worker nodes may not benefit from " "the optimizations."), &MaxWorkerNodesTracked, 2048, 1024, INT_MAX, @@ -1994,7 +1994,7 @@ RegisterCitusConfigVariables(void) gettext_noop("When enabled, the executor waits until all the connections " "are successfully established."), gettext_noop("Under some load, the executor may decide to establish some " - "extra connections to further parallelize the execution. However," + "extra connections to further parallelize the execution. However, " "before the connection establishment is done, the execution might " "have already finished. When this GUC is set to true, the execution " "waits for such connections to be established."), @@ -2092,7 +2092,7 @@ RegisterCitusConfigVariables(void) "citus.replication_model", gettext_noop("Deprecated. Please use citus.shard_replication_factor instead"), gettext_noop( - "Shard replication model is determined by the shard replication factor." + "Shard replication model is determined by the shard replication factor. " "'statement' replication is used only when the replication factor is one."), &ReplicationModel, REPLICATION_MODEL_STREAMING, @@ -2178,7 +2178,7 @@ RegisterCitusConfigVariables(void) "citus.skip_advisory_lock_permission_checks", gettext_noop("Postgres would normally enforce some " "ownership checks while acquiring locks. " - "When this setting is 'off', Citus skips" + "When this setting is 'on', Citus skips " "ownership checks on internal advisory " "locks."), NULL, @@ -2225,7 +2225,7 @@ RegisterCitusConfigVariables(void) gettext_noop("This feature is not intended for users. It is developed " "to get consistent regression test outputs. When enabled, " "the RETURNING clause returns the tuples sorted. The sort " - "is done for all the entries, starting from the first one." + "is done for all the entries, starting from the first one. " "Finally, the sorting is done in ASC order."), &SortReturning, false, @@ -2283,7 +2283,7 @@ RegisterCitusConfigVariables(void) "It means that the queries are likely to return wrong results " "unless the user is absolutely sure that pushing down the " "subquery is safe. This GUC is maintained only for backward " - "compatibility, no new users are supposed to use it. The planner" + "compatibility, no new users are supposed to use it. The planner " "is capable of pushing down as much computation as possible to the " "shards depending on the query."), &SubqueryPushdown, diff --git a/src/backend/distributed/utils/background_jobs.c b/src/backend/distributed/utils/background_jobs.c index 45f7a842f..a502b9219 100644 --- a/src/backend/distributed/utils/background_jobs.c +++ b/src/backend/distributed/utils/background_jobs.c @@ -72,12 +72,14 @@ #define CITUS_BACKGROUND_TASK_KEY_COMMAND 2 #define CITUS_BACKGROUND_TASK_KEY_QUEUE 3 #define CITUS_BACKGROUND_TASK_KEY_TASK_ID 4 -#define CITUS_BACKGROUND_TASK_NKEYS 5 +#define CITUS_BACKGROUND_TASK_KEY_JOB_ID 5 +#define CITUS_BACKGROUND_TASK_NKEYS 6 static BackgroundWorkerHandle * StartCitusBackgroundTaskExecutor(char *database, char *user, char *command, int64 taskId, + int64 jobId, dsm_segment **pSegment); static void ExecuteSqlString(const char *sql); static shm_mq_result ConsumeTaskWorkerOutput(shm_mq_handle *responseq, StringInfo message, @@ -106,6 +108,18 @@ static TaskExecutionStatus ConsumeExecutorQueue( TaskExecutionContext *taskExecutionContext); static void TaskHadError(TaskExecutionContext *taskExecutionContext); static void TaskEnded(TaskExecutionContext *taskExecutionContext); +static void TerminateAllTaskExecutors(HTAB *currentExecutors); +static HTAB * GetRunningUniqueJobIds(HTAB *currentExecutors); +static void CancelAllTaskExecutors(HTAB *currentExecutors); +static bool MonitorGotTerminationOrCancellationRequest(); +static void QueueMonitorSigTermHandler(SIGNAL_ARGS); +static void QueueMonitorSigIntHandler(SIGNAL_ARGS); +static void QueueMonitorSigHupHandler(SIGNAL_ARGS); + +/* flags set by signal handlers */ +static volatile sig_atomic_t GotSigterm = false; +static volatile sig_atomic_t GotSigint = false; +static volatile sig_atomic_t GotSighup = false; PG_FUNCTION_INFO_V1(citus_job_cancel); PG_FUNCTION_INFO_V1(citus_job_wait); @@ -337,8 +351,6 @@ CitusBackgroundTaskQueueMonitorErrorCallback(void *arg) /* * NewExecutorExceedsCitusLimit returns true if currently we reached Citus' max worker count. - * It also sleeps 1 sec to let running tasks progress so that we have better chance to not hit - * that limit again. */ static bool NewExecutorExceedsCitusLimit(QueueMonitorExecutionContext *queueMonitorExecutionContext) @@ -372,8 +384,6 @@ NewExecutorExceedsCitusLimit(QueueMonitorExecutionContext *queueMonitorExecution /* * NewExecutorExceedsPgMaxWorkers returns true if currently we reached Postgres' max worker count. - * It also sleeps 1 sec to let running tasks progress so that we have better chance to not hit - * that limit again. */ static bool NewExecutorExceedsPgMaxWorkers(BackgroundWorkerHandle *handle, @@ -389,9 +399,8 @@ NewExecutorExceedsPgMaxWorkers(BackgroundWorkerHandle *handle, */ if (queueMonitorExecutionContext->backgroundWorkerFailedStartTime == 0) { - ereport(WARNING, (errmsg( - "unable to start background worker for " - "background task execution"), + ereport(WARNING, (errmsg("unable to start background worker for " + "background task execution"), errdetail( "Current number of task " "executors: %ld/%d", @@ -432,7 +441,7 @@ AssignRunnableTaskToNewExecutor(BackgroundTask *runnableTask, dsm_segment *seg = NULL; BackgroundWorkerHandle *handle = StartCitusBackgroundTaskExecutor(databaseName, userName, runnableTask->command, - runnableTask->taskid, &seg); + runnableTask->taskid, runnableTask->jobid, &seg); MemoryContextSwitchTo(oldContext); if (NewExecutorExceedsPgMaxWorkers(handle, queueMonitorExecutionContext)) @@ -450,6 +459,7 @@ AssignRunnableTaskToNewExecutor(BackgroundTask *runnableTask, Assert(!handleEntryFound); handleEntry->handle = handle; handleEntry->seg = seg; + handleEntry->jobid = runnableTask->jobid; /* reset worker allocation timestamp and log time elapsed since the last failure */ CheckAndResetLastWorkerAllocationFailure(queueMonitorExecutionContext); @@ -543,7 +553,7 @@ CheckAndResetLastWorkerAllocationFailure( GetCurrentTimestamp(), &secs, µsecs); ereport(LOG, (errmsg( - "able to start a background worker with %ld seconds" + "able to start a background worker with %ld seconds " "delay", secs))); queueMonitorExecutionContext->backgroundWorkerFailedStartTime = 0; @@ -741,6 +751,138 @@ TaskEnded(TaskExecutionContext *taskExecutionContext) } +/* + * QueueMonitorSigHupHandler handles SIGHUP to update monitor related config params. + */ +static void +QueueMonitorSigHupHandler(SIGNAL_ARGS) +{ + int saved_errno = errno; + + GotSighup = true; + + if (MyProc) + { + SetLatch(&MyProc->procLatch); + } + + errno = saved_errno; +} + + +/* + * MonitorGotTerminationOrCancellationRequest returns true if monitor had SIGTERM or SIGINT signals + */ +static bool +MonitorGotTerminationOrCancellationRequest() +{ + return GotSigterm || GotSigint; +} + + +/* + * QueueMonitorSigTermHandler handles SIGTERM by setting a flag to inform the monitor process + * so that it can terminate active task executors properly. It also sets the latch to awake the + * monitor if it waits on it. + */ +static void +QueueMonitorSigTermHandler(SIGNAL_ARGS) +{ + int saved_errno = errno; + + GotSigterm = true; + + if (MyProc) + { + SetLatch(&MyProc->procLatch); + } + + errno = saved_errno; +} + + +/* + * QueueMonitorSigIntHandler handles SIGINT by setting a flag to inform the monitor process + * so that it can terminate active task executors properly. It also sets the latch to awake the + * monitor if it waits on it. + */ +static void +QueueMonitorSigIntHandler(SIGNAL_ARGS) +{ + int saved_errno = errno; + + GotSigint = true; + + if (MyProc) + { + SetLatch(&MyProc->procLatch); + } + + errno = saved_errno; +} + + +/* + * TerminateAllTaskExecutors terminates task executors given in the hash map. + */ +static void +TerminateAllTaskExecutors(HTAB *currentExecutors) +{ + HASH_SEQ_STATUS status; + BackgroundExecutorHashEntry *backgroundExecutorHashEntry; + foreach_htab(backgroundExecutorHashEntry, &status, currentExecutors) + { + TerminateBackgroundWorker(backgroundExecutorHashEntry->handle); + } +} + + +/* + * GetRunningUniqueJobIds returns unique job ids from currentExecutors + */ +static HTAB * +GetRunningUniqueJobIds(HTAB *currentExecutors) +{ + /* create a set to store unique job ids for currently executing tasks */ + HTAB *uniqueJobIds = CreateSimpleHashSetWithSize(int64, MAX_BG_TASK_EXECUTORS); + + HASH_SEQ_STATUS status; + BackgroundExecutorHashEntry *backgroundExecutorHashEntry; + foreach_htab(backgroundExecutorHashEntry, &status, currentExecutors) + { + hash_search(uniqueJobIds, &backgroundExecutorHashEntry->jobid, HASH_ENTER, NULL); + } + + return uniqueJobIds; +} + + +/* + * CancelAllTaskExecutors cancels task executors given in the hash map. + */ +static void +CancelAllTaskExecutors(HTAB *currentExecutors) +{ + StartTransactionCommand(); + PushActiveSnapshot(GetTransactionSnapshot()); + + /* get unique job id set for running tasks in currentExecutors */ + HTAB *uniqueJobIds = GetRunningUniqueJobIds(currentExecutors); + + HASH_SEQ_STATUS status; + int64 *uniqueJobId; + foreach_htab(uniqueJobId, &status, uniqueJobIds) + { + ereport(DEBUG1, (errmsg("cancelling job: %ld", *uniqueJobId))); + Datum jobidDatum = Int64GetDatum(*uniqueJobId); + DirectFunctionCall1(citus_job_cancel, jobidDatum); + } + + PopActiveSnapshot(); + CommitTransactionCommand(); +} + + /* * CitusBackgroundTaskQueueMonitorMain is the main entry point for the background worker * running the background tasks queue monitor. @@ -758,6 +900,18 @@ TaskEnded(TaskExecutionContext *taskExecutionContext) void CitusBackgroundTaskQueueMonitorMain(Datum arg) { + /* handle SIGTERM to properly terminate active task executors */ + pqsignal(SIGTERM, QueueMonitorSigTermHandler); + + /* handle SIGINT to properly cancel active task executors */ + pqsignal(SIGINT, QueueMonitorSigIntHandler); + + /* handle SIGHUP to update MaxBackgroundTaskExecutors */ + pqsignal(SIGHUP, QueueMonitorSigHupHandler); + + /* ready to handle signals */ + BackgroundWorkerUnblockSignals(); + Oid databaseOid = DatumGetObjectId(arg); /* extension owner is passed via bgw_extra */ @@ -765,8 +919,6 @@ CitusBackgroundTaskQueueMonitorMain(Datum arg) memcpy_s(&extensionOwner, sizeof(extensionOwner), MyBgworkerEntry->bgw_extra, sizeof(Oid)); - BackgroundWorkerUnblockSignals(); - /* connect to database, after that we can actually access catalogs */ BackgroundWorkerInitializeConnectionByOid(databaseOid, extensionOwner, 0); @@ -807,10 +959,6 @@ CitusBackgroundTaskQueueMonitorMain(Datum arg) * cause conflicts on processing the tasks in the catalog table as well as violate * parallelism guarantees. To make sure there is at most, exactly one backend running * we take a session lock on the CITUS_BACKGROUND_TASK_MONITOR operation. - * - * TODO now that we have a lock, we should install a term handler to terminate any - * 'child' backend when we are terminated. Otherwise we will still have a situation - * where the actual task could be running multiple times. */ LOCKTAG tag = { 0 }; SET_LOCKTAG_CITUS_OPERATION(tag, CITUS_BACKGROUND_TASK_MONITOR); @@ -841,11 +989,13 @@ CitusBackgroundTaskQueueMonitorMain(Datum arg) PopActiveSnapshot(); CommitTransactionCommand(); - /* create a map to store parallel task executors */ + /* create a map to store parallel task executors. Persist it in monitor memory context */ + oldContext = MemoryContextSwitchTo(backgroundTaskContext); HTAB *currentExecutors = CreateSimpleHashWithNameAndSize(int64, BackgroundExecutorHashEntry, "Background Executor Hash", MAX_BG_TASK_EXECUTORS); + MemoryContextSwitchTo(oldContext); /* * monitor execution context that is useful during the monitor loop. @@ -861,6 +1011,10 @@ CitusBackgroundTaskQueueMonitorMain(Datum arg) .ctx = backgroundTaskContext }; + /* flag to prevent duplicate termination and cancellation of task executors */ + bool terminateExecutorsStarted = false; + bool cancelExecutorsStarted = false; + /* loop exits if there is no running or runnable tasks left */ bool hasAnyTask = true; while (hasAnyTask) @@ -868,15 +1022,47 @@ CitusBackgroundTaskQueueMonitorMain(Datum arg) /* handle signals */ CHECK_FOR_INTERRUPTS(); + /* + * if the flag is set, we should terminate all task executor workers to prevent duplicate + * runs of the same task on the next start of the monitor, which is dangerous for non-idempotent + * tasks. We do not break the loop here as we want to reflect tasks' messages. Hence, we wait until + * all tasks finish and also do not allow new runnable tasks to start running. After all current tasks + * finish, we can exit the loop safely. + */ + if (GotSigterm && !terminateExecutorsStarted) + { + ereport(LOG, (errmsg("handling termination signal"))); + terminateExecutorsStarted = true; + TerminateAllTaskExecutors(queueMonitorExecutionContext.currentExecutors); + } + + if (GotSigint && !cancelExecutorsStarted) + { + ereport(LOG, (errmsg("handling cancellation signal"))); + cancelExecutorsStarted = true; + CancelAllTaskExecutors(queueMonitorExecutionContext.currentExecutors); + } + + if (GotSighup) + { + GotSighup = false; + + /* update max_background_task_executors if changed */ + ProcessConfigFile(PGC_SIGHUP); + } + /* invalidate cache for new data in catalog */ InvalidateMetadataSystemCache(); - /* assign runnable tasks, if any, to new task executors in a transaction */ - StartTransactionCommand(); - PushActiveSnapshot(GetTransactionSnapshot()); - AssignRunnableTasks(&queueMonitorExecutionContext); - PopActiveSnapshot(); - CommitTransactionCommand(); + /* assign runnable tasks, if any, to new task executors in a transaction if we do not have SIGTERM or SIGINT */ + if (!MonitorGotTerminationOrCancellationRequest()) + { + StartTransactionCommand(); + PushActiveSnapshot(GetTransactionSnapshot()); + AssignRunnableTasks(&queueMonitorExecutionContext); + PopActiveSnapshot(); + CommitTransactionCommand(); + } /* get running task entries from hash table */ List *runningTaskEntries = GetRunningTaskEntries( @@ -1268,7 +1454,8 @@ ConsumeTaskWorkerOutput(shm_mq_handle *responseq, StringInfo message, bool *hadE * environment to the executor. */ static dsm_segment * -StoreArgumentsInDSM(char *database, char *username, char *command, int64 taskId) +StoreArgumentsInDSM(char *database, char *username, char *command, + int64 taskId, int64 jobId) { /* * Create the shared memory that we will pass to the background @@ -1284,6 +1471,7 @@ StoreArgumentsInDSM(char *database, char *username, char *command, int64 taskId) #define QUEUE_SIZE ((Size) 65536) shm_toc_estimate_chunk(&e, QUEUE_SIZE); shm_toc_estimate_chunk(&e, sizeof(int64)); + shm_toc_estimate_chunk(&e, sizeof(int64)); shm_toc_estimate_keys(&e, CITUS_BACKGROUND_TASK_NKEYS); Size segsize = shm_toc_estimate(&e); @@ -1330,6 +1518,10 @@ StoreArgumentsInDSM(char *database, char *username, char *command, int64 taskId) *taskIdTarget = taskId; shm_toc_insert(toc, CITUS_BACKGROUND_TASK_KEY_TASK_ID, taskIdTarget); + int64 *jobIdTarget = shm_toc_allocate(toc, sizeof(int64)); + *jobIdTarget = jobId; + shm_toc_insert(toc, CITUS_BACKGROUND_TASK_KEY_JOB_ID, jobIdTarget); + shm_mq_attach(mq, seg, NULL); return seg; @@ -1343,17 +1535,17 @@ StoreArgumentsInDSM(char *database, char *username, char *command, int64 taskId) * pointer to the dynamic shared memory. */ static BackgroundWorkerHandle * -StartCitusBackgroundTaskExecutor(char *database, char *user, char *command, int64 taskId, - dsm_segment **pSegment) +StartCitusBackgroundTaskExecutor(char *database, char *user, char *command, + int64 taskId, int64 jobId, dsm_segment **pSegment) { - dsm_segment *seg = StoreArgumentsInDSM(database, user, command, taskId); + dsm_segment *seg = StoreArgumentsInDSM(database, user, command, taskId, jobId); /* Configure a worker. */ BackgroundWorker worker = { 0 }; memset(&worker, 0, sizeof(worker)); SafeSnprintf(worker.bgw_name, BGW_MAXLEN, - "Citus Background Task Queue Executor: %s/%s", - database, user); + "Citus Background Task Queue Executor: %s/%s for (%ld/%ld)", + database, user, jobId, taskId); worker.bgw_flags = BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION; worker.bgw_start_time = BgWorkerStart_ConsistentState; @@ -1391,6 +1583,8 @@ typedef struct CitusBackgroundJobExecutorErrorCallbackContext { const char *database; const char *username; + int64 taskId; + int64 jobId; } CitusBackgroundJobExecutorErrorCallbackContext; @@ -1403,8 +1597,9 @@ CitusBackgroundJobExecutorErrorCallback(void *arg) { CitusBackgroundJobExecutorErrorCallbackContext *context = (CitusBackgroundJobExecutorErrorCallbackContext *) arg; - errcontext("Citus Background Task Queue Executor: %s/%s", context->database, - context->username); + errcontext("Citus Background Task Queue Executor: %s/%s for (%ld/%ld)", + context->database, context->username, + context->jobId, context->taskId); } @@ -1418,10 +1613,6 @@ CitusBackgroundJobExecutorErrorCallback(void *arg) void CitusBackgroundTaskExecutor(Datum main_arg) { - /* - * TODO figure out if we need this signal handler that is in pgcron - * pqsignal(SIGTERM, pg_cron_background_worker_sigterm); - */ BackgroundWorkerUnblockSignals(); /* Set up a dynamic shared memory segment. */ @@ -1445,6 +1636,7 @@ CitusBackgroundTaskExecutor(Datum main_arg) char *username = shm_toc_lookup(toc, CITUS_BACKGROUND_TASK_KEY_USERNAME, false); char *command = shm_toc_lookup(toc, CITUS_BACKGROUND_TASK_KEY_COMMAND, false); int64 *taskId = shm_toc_lookup(toc, CITUS_BACKGROUND_TASK_KEY_TASK_ID, false); + int64 *jobId = shm_toc_lookup(toc, CITUS_BACKGROUND_TASK_KEY_JOB_ID, false); shm_mq *mq = shm_toc_lookup(toc, CITUS_BACKGROUND_TASK_KEY_QUEUE, false); shm_mq_set_sender(mq, MyProc); @@ -1456,6 +1648,8 @@ CitusBackgroundTaskExecutor(Datum main_arg) CitusBackgroundJobExecutorErrorCallbackContext context = { .database = database, .username = username, + .taskId = *taskId, + .jobId = *jobId, }; errorCallback.callback = CitusBackgroundJobExecutorErrorCallback; errorCallback.arg = (void *) &context; @@ -1482,8 +1676,8 @@ CitusBackgroundTaskExecutor(Datum main_arg) /* Prepare to execute the query. */ SetCurrentStatementStartTimestamp(); debug_query_string = command; - char *appname = psprintf("citus background task queue executor (taskId %ld)", - *taskId); + char *appname = psprintf("citus background task queue executor (%ld/%ld)", + *jobId, *taskId); pgstat_report_appname(appname); pgstat_report_activity(STATE_RUNNING, command); StartTransactionCommand(); diff --git a/src/bin/pg_send_cancellation/Makefile b/src/bin/pg_send_cancellation/Makefile index 7cf76757f..4515c5019 100644 --- a/src/bin/pg_send_cancellation/Makefile +++ b/src/bin/pg_send_cancellation/Makefile @@ -10,7 +10,11 @@ PG_LDFLAGS += $(LDFLAGS) include $(citus_top_builddir)/Makefile.global # We reuse all the Citus flags (incl. security flags), but we are building a program not a shared library -override CFLAGS := $(filter-out -shared,$(CFLAGS)) +# We sometimes build Citus with a newer version of gcc than Postgres was built +# with and this breaks LTO (link-time optimization). Even if disabling it can +# have some perf impact this is ok because pg_send_cancellation is only used +# for tests anyway. +override CFLAGS := $(filter-out -shared, $(CFLAGS)) -fno-lto # Filter out unneeded dependencies override LIBS := $(filter-out -lz -lreadline -ledit -ltermcap -lncurses -lcurses -lpam, $(LIBS)) diff --git a/src/include/distributed/background_jobs.h b/src/include/distributed/background_jobs.h index a83928e82..75b7b982b 100644 --- a/src/include/distributed/background_jobs.h +++ b/src/include/distributed/background_jobs.h @@ -28,6 +28,7 @@ typedef struct BackgroundExecutorHashEntry BackgroundWorkerHandle *handle; dsm_segment *seg; + int64 jobid; StringInfo message; } BackgroundExecutorHashEntry; diff --git a/src/include/distributed/commands.h b/src/include/distributed/commands.h index c09f077dd..fa2691fee 100644 --- a/src/include/distributed/commands.h +++ b/src/include/distributed/commands.h @@ -385,6 +385,9 @@ extern bool IsReindexWithParam_compat(ReindexStmt *stmt, char *paramName); extern List * CreateExtensionStmtObjectAddress(Node *stmt, bool missing_ok, bool isPostprocess); +/* owned.c - forward declarations */ +extern List * PreprocessDropOwnedStmt(Node *node, const char *queryString, + ProcessUtilityContext processUtilityContext); /* policy.c - forward declarations */ extern List * CreatePolicyCommands(Oid relationId); diff --git a/src/include/distributed/deparser.h b/src/include/distributed/deparser.h index 0d0a99e22..87704b628 100644 --- a/src/include/distributed/deparser.h +++ b/src/include/distributed/deparser.h @@ -196,6 +196,9 @@ extern char * DeparseCreateRoleStmt(Node *stmt); extern char * DeparseDropRoleStmt(Node *stmt); extern char * DeparseGrantRoleStmt(Node *stmt); +/* forward declarations for deparse_owned_stmts.c */ +extern char * DeparseDropOwnedStmt(Node *node); + /* forward declarations for deparse_extension_stmts.c */ extern DefElem * GetExtensionOption(List *extensionOptions, const char *defname); diff --git a/src/test/regress/expected/background_task_queue_monitor.out b/src/test/regress/expected/background_task_queue_monitor.out index 7278eebfc..2d528c50c 100644 --- a/src/test/regress/expected/background_task_queue_monitor.out +++ b/src/test/regress/expected/background_task_queue_monitor.out @@ -125,6 +125,12 @@ SELECT citus_job_cancel(:job_id); (1 row) +SELECT citus_job_wait(:job_id); + citus_job_wait +--------------------------------------------------------------------- + +(1 row) + SELECT state, NOT(started_at IS NULL) AS did_start FROM pg_dist_background_job WHERE job_id = :job_id; state | did_start --------------------------------------------------------------------- @@ -242,6 +248,12 @@ SELECT citus_job_cancel(:job_id2); -- when a job with 1 task is cancelled, the l (1 row) +SELECT citus_job_wait(:job_id2); -- wait for the job to be cancelled + citus_job_wait +--------------------------------------------------------------------- + +(1 row) + SELECT citus_job_wait(:job_id3, desired_status => 'running'); citus_job_wait --------------------------------------------------------------------- @@ -278,12 +290,6 @@ SELECT citus_job_wait(:job_id1); (1 row) -SELECT citus_job_wait(:job_id2); - citus_job_wait ---------------------------------------------------------------------- - -(1 row) - SELECT citus_job_wait(:job_id3); citus_job_wait --------------------------------------------------------------------- @@ -302,11 +308,262 @@ SELECT job_id, task_id, status FROM pg_dist_background_task 9 | 15 | cancelled (5 rows) --- verify that task is not starved by currently long running task +-- verify that a task, previously not started due to lack of workers, is executed after we increase max worker count BEGIN; INSERT INTO pg_dist_background_job (job_type, description) VALUES ('test_job', 'simple test to verify max parallel background execution') RETURNING job_id AS job_id1 \gset -INSERT INTO pg_dist_background_task (job_id, command) VALUES (:job_id1, $job$ SELECT pg_sleep(5000); $job$) RETURNING task_id AS task_id1 \gset +INSERT INTO pg_dist_background_task (job_id, command) VALUES (:job_id1, $job$ SELECT pg_sleep(5); $job$) RETURNING task_id AS task_id1 \gset +INSERT INTO pg_dist_background_task (job_id, command) VALUES (:job_id1, $job$ SELECT pg_sleep(5); $job$) RETURNING task_id AS task_id2 \gset +INSERT INTO pg_dist_background_task (job_id, command) VALUES (:job_id1, $job$ SELECT pg_sleep(5); $job$) RETURNING task_id AS task_id3 \gset INSERT INTO pg_dist_background_job (job_type, description) VALUES ('test_job', 'simple test to verify max parallel background execution') RETURNING job_id AS job_id2 \gset +INSERT INTO pg_dist_background_task (job_id, command) VALUES (:job_id2, $job$ SELECT pg_sleep(5); $job$) RETURNING task_id AS task_id4 \gset +INSERT INTO pg_dist_background_job (job_type, description) VALUES ('test_job', 'simple test to verify max parallel background execution') RETURNING job_id AS job_id3 \gset +INSERT INTO pg_dist_background_task (job_id, command) VALUES (:job_id3, $job$ SELECT pg_sleep(5); $job$) RETURNING task_id AS task_id5 \gset +COMMIT; +SELECT pg_sleep(2); -- we assume this is enough time for all tasks to be in running status except the last one due to parallel worker limit + pg_sleep +--------------------------------------------------------------------- + +(1 row) + +SELECT task_id, status FROM pg_dist_background_task + WHERE task_id IN (:task_id1, :task_id2, :task_id3, :task_id4, :task_id5) + ORDER BY task_id; -- show that last task is not running but ready to run(runnable) + task_id | status +--------------------------------------------------------------------- + 16 | running + 17 | running + 18 | running + 19 | running + 20 | runnable +(5 rows) + +ALTER SYSTEM SET citus.max_background_task_executors TO 5; +SELECT pg_reload_conf(); -- the last runnable task will be running after change + pg_reload_conf +--------------------------------------------------------------------- + t +(1 row) + +SELECT citus_job_wait(:job_id3, desired_status => 'running'); + citus_job_wait +--------------------------------------------------------------------- + +(1 row) + +SELECT task_id, status FROM pg_dist_background_task + WHERE task_id IN (:task_id1, :task_id2, :task_id3, :task_id4, :task_id5) + ORDER BY task_id; -- show that last task is running + task_id | status +--------------------------------------------------------------------- + 16 | running + 17 | running + 18 | running + 19 | running + 20 | running +(5 rows) + +SELECT citus_job_cancel(:job_id1); + citus_job_cancel +--------------------------------------------------------------------- + +(1 row) + +SELECT citus_job_cancel(:job_id2); + citus_job_cancel +--------------------------------------------------------------------- + +(1 row) + +SELECT citus_job_cancel(:job_id3); + citus_job_cancel +--------------------------------------------------------------------- + +(1 row) + +SELECT citus_job_wait(:job_id1); + citus_job_wait +--------------------------------------------------------------------- + +(1 row) + +SELECT citus_job_wait(:job_id2); + citus_job_wait +--------------------------------------------------------------------- + +(1 row) + +SELECT citus_job_wait(:job_id3); + citus_job_wait +--------------------------------------------------------------------- + +(1 row) + +SELECT task_id, status FROM pg_dist_background_task + WHERE task_id IN (:task_id1, :task_id2, :task_id3, :task_id4, :task_id5) + ORDER BY task_id; -- show that all tasks are cancelled + task_id | status +--------------------------------------------------------------------- + 16 | cancelled + 17 | cancelled + 18 | cancelled + 19 | cancelled + 20 | cancelled +(5 rows) + +-- verify that upon termination signal, all tasks fail and retry policy sets their status back to runnable +BEGIN; +INSERT INTO pg_dist_background_job (job_type, description) VALUES ('test_job', 'simple test to verify termination on monitor') RETURNING job_id AS job_id1 \gset +INSERT INTO pg_dist_background_task (job_id, command) VALUES (:job_id1, $job$ SELECT pg_sleep(500); $job$) RETURNING task_id AS task_id1 \gset +INSERT INTO pg_dist_background_job (job_type, description) VALUES ('test_job', 'simple test to verify termination on monitor') RETURNING job_id AS job_id2 \gset +INSERT INTO pg_dist_background_task (job_id, command) VALUES (:job_id2, $job$ SELECT pg_sleep(500); $job$) RETURNING task_id AS task_id2 \gset +COMMIT; +SELECT citus_job_wait(:job_id1, desired_status => 'running'); + citus_job_wait +--------------------------------------------------------------------- + +(1 row) + +SELECT citus_job_wait(:job_id2, desired_status => 'running'); + citus_job_wait +--------------------------------------------------------------------- + +(1 row) + +SELECT task_id, status FROM pg_dist_background_task + WHERE task_id IN (:task_id1, :task_id2) + ORDER BY task_id; + task_id | status +--------------------------------------------------------------------- + 21 | running + 22 | running +(2 rows) + +SELECT pid AS monitor_pid FROM pg_stat_activity WHERE application_name ~ 'task queue monitor' \gset +SELECT pg_terminate_backend(:monitor_pid); -- terminate monitor process + pg_terminate_backend +--------------------------------------------------------------------- + t +(1 row) + +SELECT pg_sleep(2); -- wait enough to show that tasks are terminated + pg_sleep +--------------------------------------------------------------------- + +(1 row) + +SELECT task_id, status, retry_count, message FROM pg_dist_background_task + WHERE task_id IN (:task_id1, :task_id2) + ORDER BY task_id; -- show that all tasks are runnable by retry policy after termination signal + task_id | status | retry_count | message +--------------------------------------------------------------------- + 21 | runnable | 1 | FATAL: terminating background worker "Citus Background Task Queue Executor: regression/postgres for (13/21)" due to administrator command+ + | | | CONTEXT: Citus Background Task Queue Executor: regression/postgres for (13/21) + + | | | + 22 | runnable | 1 | FATAL: terminating background worker "Citus Background Task Queue Executor: regression/postgres for (14/22)" due to administrator command+ + | | | CONTEXT: Citus Background Task Queue Executor: regression/postgres for (14/22) + + | | | +(2 rows) + +SELECT citus_job_cancel(:job_id1); + citus_job_cancel +--------------------------------------------------------------------- + +(1 row) + +SELECT citus_job_cancel(:job_id2); + citus_job_cancel +--------------------------------------------------------------------- + +(1 row) + +SELECT citus_job_wait(:job_id1); + citus_job_wait +--------------------------------------------------------------------- + +(1 row) + +SELECT citus_job_wait(:job_id2); + citus_job_wait +--------------------------------------------------------------------- + +(1 row) + +SELECT task_id, status FROM pg_dist_background_task + WHERE task_id IN (:task_id1, :task_id2) + ORDER BY task_id; -- show that all tasks are cancelled + task_id | status +--------------------------------------------------------------------- + 21 | cancelled + 22 | cancelled +(2 rows) + +-- verify that upon cancellation signal, all tasks are cancelled +BEGIN; +INSERT INTO pg_dist_background_job (job_type, description) VALUES ('test_job', 'simple test to verify cancellation on monitor') RETURNING job_id AS job_id1 \gset +INSERT INTO pg_dist_background_task (job_id, command) VALUES (:job_id1, $job$ SELECT pg_sleep(500); $job$) RETURNING task_id AS task_id1 \gset +INSERT INTO pg_dist_background_job (job_type, description) VALUES ('test_job', 'simple test to verify cancellation on monitor') RETURNING job_id AS job_id2 \gset +INSERT INTO pg_dist_background_task (job_id, command) VALUES (:job_id2, $job$ SELECT pg_sleep(500); $job$) RETURNING task_id AS task_id2 \gset +COMMIT; +SELECT citus_job_wait(:job_id1, desired_status => 'running'); + citus_job_wait +--------------------------------------------------------------------- + +(1 row) + +SELECT citus_job_wait(:job_id2, desired_status => 'running'); + citus_job_wait +--------------------------------------------------------------------- + +(1 row) + +SELECT task_id, status FROM pg_dist_background_task + WHERE task_id IN (:task_id1, :task_id2) + ORDER BY task_id; + task_id | status +--------------------------------------------------------------------- + 23 | running + 24 | running +(2 rows) + +SELECT pid AS monitor_pid FROM pg_stat_activity WHERE application_name ~ 'task queue monitor' \gset +SELECT pg_cancel_backend(:monitor_pid); -- cancel monitor process + pg_cancel_backend +--------------------------------------------------------------------- + t +(1 row) + +SELECT pg_sleep(2); -- wait enough to show that tasks are cancelled + pg_sleep +--------------------------------------------------------------------- + +(1 row) + +SELECT citus_job_wait(:job_id1); + citus_job_wait +--------------------------------------------------------------------- + +(1 row) + +SELECT citus_job_wait(:job_id2); + citus_job_wait +--------------------------------------------------------------------- + +(1 row) + +SELECT task_id, status FROM pg_dist_background_task + WHERE task_id IN (:task_id1, :task_id2) + ORDER BY task_id; -- show that all tasks are cancelled + task_id | status +--------------------------------------------------------------------- + 23 | cancelled + 24 | cancelled +(2 rows) + +-- verify that task is not starved by currently long running task +BEGIN; +INSERT INTO pg_dist_background_job (job_type, description) VALUES ('test_job', 'simple test to verify task execution starvation') RETURNING job_id AS job_id1 \gset +INSERT INTO pg_dist_background_task (job_id, command) VALUES (:job_id1, $job$ SELECT pg_sleep(5000); $job$) RETURNING task_id AS task_id1 \gset +INSERT INTO pg_dist_background_job (job_type, description) VALUES ('test_job', 'simple test to verify task execution starvation') RETURNING job_id AS job_id2 \gset INSERT INTO pg_dist_background_task (job_id, command) VALUES (:job_id2, $job$ SELECT 1; $job$) RETURNING task_id AS task_id2 \gset COMMIT; SELECT citus_job_wait(:job_id1, desired_status => 'running'); @@ -326,8 +583,8 @@ SELECT job_id, task_id, status FROM pg_dist_background_task ORDER BY job_id, task_id; -- show that last task is finished without starvation job_id | task_id | status --------------------------------------------------------------------- - 10 | 16 | running - 11 | 17 | done + 17 | 25 | running + 18 | 26 | done (2 rows) SELECT citus_job_cancel(:job_id1); @@ -336,6 +593,21 @@ SELECT citus_job_cancel(:job_id1); (1 row) +SELECT citus_job_wait(:job_id1); + citus_job_wait +--------------------------------------------------------------------- + +(1 row) + +SELECT job_id, task_id, status FROM pg_dist_background_task + WHERE task_id IN (:task_id1, :task_id2) + ORDER BY job_id, task_id; -- show that task is cancelled + job_id | task_id | status +--------------------------------------------------------------------- + 17 | 25 | cancelled + 18 | 26 | done +(2 rows) + SET client_min_messages TO WARNING; DROP SCHEMA background_task_queue_monitor CASCADE; ALTER SYSTEM RESET citus.background_task_queue_interval; diff --git a/src/test/regress/expected/citus_non_blocking_split_columnar.out b/src/test/regress/expected/citus_non_blocking_split_columnar.out index 3098b98c1..d1ce4d6a7 100644 --- a/src/test/regress/expected/citus_non_blocking_split_columnar.out +++ b/src/test/regress/expected/citus_non_blocking_split_columnar.out @@ -525,8 +525,9 @@ NOTICE: cleaned up 11 orphaned resources -- END: Split a partition table directly -- BEGIN: Perform deferred cleanup. +SET client_min_messages TO WARNING; CALL pg_catalog.citus_cleanup_orphaned_resources(); -NOTICE: cleaned up 11 orphaned resources +RESET client_min_messages; -- END: Perform deferred cleanup. -- BEGIN: Validate Shard Info and Data SELECT shard.shardid, logicalrelid, shardminvalue, shardmaxvalue, nodename, nodeport diff --git a/src/test/regress/expected/citus_non_blocking_split_shard_cleanup.out b/src/test/regress/expected/citus_non_blocking_split_shard_cleanup.out index bf6467779..d7a1bc47d 100644 --- a/src/test/regress/expected/citus_non_blocking_split_shard_cleanup.out +++ b/src/test/regress/expected/citus_non_blocking_split_shard_cleanup.out @@ -47,6 +47,11 @@ SELECT pg_catalog.citus_split_shard_by_split_points( (1 row) +-- BEGIN: Perform deferred cleanup. +SET client_min_messages TO WARNING; +CALL pg_catalog.citus_cleanup_orphaned_resources(); +RESET client_min_messages; +-- END: Perform deferred cleanup. \c - - - :worker_1_port SET search_path TO "citus_split_test_schema"; SET citus.show_shards_for_app_name_prefixes = '*'; diff --git a/src/test/regress/expected/citus_non_blocking_split_shards.out b/src/test/regress/expected/citus_non_blocking_split_shards.out index ad0afd3e8..f08f9c428 100644 --- a/src/test/regress/expected/citus_non_blocking_split_shards.out +++ b/src/test/regress/expected/citus_non_blocking_split_shards.out @@ -237,8 +237,9 @@ SELECT pg_catalog.citus_split_shard_by_split_points( (1 row) -- BEGIN: Perform deferred cleanup. +SET client_min_messages TO WARNING; CALL pg_catalog.citus_cleanup_orphaned_resources(); -NOTICE: cleaned up 3 orphaned resources +RESET client_min_messages; -- END: Perform deferred cleanup. -- Perform 3 way split SELECT pg_catalog.citus_split_shard_by_split_points( @@ -253,8 +254,9 @@ SELECT pg_catalog.citus_split_shard_by_split_points( -- END : Split two shards : One with move and One without move. -- BEGIN: Perform deferred cleanup. +SET client_min_messages TO WARNING; CALL pg_catalog.citus_cleanup_orphaned_resources(); -NOTICE: cleaned up 3 orphaned resources +RESET client_min_messages; -- END: Perform deferred cleanup. -- BEGIN : Move a shard post split. SELECT citus_move_shard_placement(8981007, 'localhost', :worker_1_port, 'localhost', :worker_2_port, shard_transfer_mode:='block_writes'); @@ -474,7 +476,9 @@ ERROR: cannot use logical replication to transfer shards of the relation table_ DETAIL: UPDATE and DELETE commands on the shard will error out during logical replication unless there is a REPLICA IDENTITY or PRIMARY KEY. HINT: If you wish to continue without a replica identity set the shard_transfer_mode to 'force_logical' or 'block_writes'. -- BEGIN: Perform deferred cleanup. +SET client_min_messages TO WARNING; CALL pg_catalog.citus_cleanup_orphaned_resources(); +RESET client_min_messages; -- END: Perform deferred cleanup. SELECT shard.shardid, logicalrelid, shardminvalue, shardmaxvalue, nodename, nodeport FROM pg_dist_shard AS shard @@ -522,8 +526,9 @@ SELECT pg_catalog.citus_split_shard_by_split_points( (1 row) -- BEGIN: Perform deferred cleanup. +SET client_min_messages TO WARNING; CALL pg_catalog.citus_cleanup_orphaned_resources(); -NOTICE: cleaned up 3 orphaned resources +RESET client_min_messages; -- END: Perform deferred cleanup. SELECT shard.shardid, logicalrelid, shardminvalue, shardmaxvalue, nodename, nodeport FROM pg_dist_shard AS shard @@ -580,6 +585,28 @@ SELECT COUNT(*) FROM colocated_dist_table; -- END: Validate Data Count --BEGIN : Cleanup \c - postgres - :master_port +-- make sure we don't have any replication objects leftover on the workers +SELECT run_command_on_workers($$SELECT count(*) FROM pg_replication_slots$$); + run_command_on_workers +--------------------------------------------------------------------- + (localhost,57637,t,0) + (localhost,57638,t,0) +(2 rows) + +SELECT run_command_on_workers($$SELECT count(*) FROM pg_publication$$); + run_command_on_workers +--------------------------------------------------------------------- + (localhost,57637,t,0) + (localhost,57638,t,0) +(2 rows) + +SELECT run_command_on_workers($$SELECT count(*) FROM pg_subscription$$); + run_command_on_workers +--------------------------------------------------------------------- + (localhost,57637,t,0) + (localhost,57638,t,0) +(2 rows) + ALTER SYSTEM RESET citus.defer_shard_delete_interval; SELECT pg_reload_conf(); pg_reload_conf diff --git a/src/test/regress/expected/failure_online_move_shard_placement.out b/src/test/regress/expected/failure_online_move_shard_placement.out index 15684a3b1..71a3b2527 100644 --- a/src/test/regress/expected/failure_online_move_shard_placement.out +++ b/src/test/regress/expected/failure_online_move_shard_placement.out @@ -325,8 +325,9 @@ SELECT run_command_on_workers($$DROP SUBSCRIPTION IF EXISTS citus_shard_move_sub -- cleanup leftovers -- verify we don't see any error for already dropped subscription +SET client_min_messages TO WARNING; CALL citus_cleanup_orphaned_resources(); -NOTICE: cleaned up 3 orphaned resources +RESET client_min_messages; -- cancellation on dropping subscription SELECT citus.mitmproxy('conn.onQuery(query="^DROP SUBSCRIPTION").cancel(' || :pid || ')'); mitmproxy diff --git a/src/test/regress/expected/failure_split_cleanup.out b/src/test/regress/expected/failure_split_cleanup.out index 5765d3e4f..ce06f4978 100644 --- a/src/test/regress/expected/failure_split_cleanup.out +++ b/src/test/regress/expected/failure_split_cleanup.out @@ -52,11 +52,11 @@ WARNING: failed to clean up 2 orphaned shards out of 5 after a citus_split_shar ERROR: connection not open CONTEXT: while executing command on localhost:xxxxx SELECT operation_id, object_type, object_name, node_group_id, policy_type - FROM pg_dist_cleanup where operation_id = 777; + FROM pg_dist_cleanup where operation_id = 777 ORDER BY object_name; operation_id | object_type | object_name | node_group_id | policy_type --------------------------------------------------------------------- - 777 | 1 | citus_failure_split_cleanup_schema.table_to_split_8981003 | 2 | 1 777 | 1 | citus_failure_split_cleanup_schema.table_to_split_8981002 | 2 | 0 + 777 | 1 | citus_failure_split_cleanup_schema.table_to_split_8981003 | 2 | 1 777 | 4 | citus_shard_split_publication_xxxxxxx_xxxxxxx | 2 | 0 (3 rows) @@ -101,7 +101,7 @@ CONTEXT: while executing command on localhost:xxxxx CALL pg_catalog.citus_cleanup_orphaned_resources(); NOTICE: cleaned up 3 orphaned resources SELECT operation_id, object_type, object_name, node_group_id, policy_type - FROM pg_dist_cleanup where operation_id = 777; + FROM pg_dist_cleanup where operation_id = 777 ORDER BY object_name; operation_id | object_type | object_name | node_group_id | policy_type --------------------------------------------------------------------- (0 rows) @@ -155,11 +155,11 @@ NOTICE: cleaned up 3 orphaned resources ERROR: Failed to run worker_split_shard_replication_setup UDF. It should successfully execute for splitting a shard in a non-blocking way. Please retry. RESET client_min_messages; SELECT operation_id, object_type, object_name, node_group_id, policy_type - FROM pg_dist_cleanup where operation_id = 777; + FROM pg_dist_cleanup where operation_id = 777 ORDER BY object_name; operation_id | object_type | object_name | node_group_id | policy_type --------------------------------------------------------------------- - 777 | 1 | citus_failure_split_cleanup_schema.table_to_split_8981003 | 2 | 1 777 | 1 | citus_failure_split_cleanup_schema.table_to_split_8981002 | 2 | 0 + 777 | 1 | citus_failure_split_cleanup_schema.table_to_split_8981003 | 2 | 1 777 | 4 | citus_shard_split_publication_xxxxxxx_xxxxxxx | 2 | 0 777 | 4 | citus_shard_split_publication_xxxxxxx_xxxxxxx | 2 | 0 (4 rows) @@ -207,7 +207,7 @@ ERROR: Failed to run worker_split_shard_replication_setup UDF. It should succes CALL pg_catalog.citus_cleanup_orphaned_resources(); NOTICE: cleaned up 4 orphaned resources SELECT operation_id, object_type, object_name, node_group_id, policy_type - FROM pg_dist_cleanup where operation_id = 777; + FROM pg_dist_cleanup where operation_id = 777 ORDER BY object_name; operation_id | object_type | object_name | node_group_id | policy_type --------------------------------------------------------------------- (0 rows) @@ -266,11 +266,11 @@ WARNING: failed to clean up 2 orphaned shards out of 7 after a citus_split_shar ERROR: connection not open CONTEXT: while executing command on localhost:xxxxx SELECT operation_id, object_type, object_name, node_group_id, policy_type - FROM pg_dist_cleanup where operation_id = 777; + FROM pg_dist_cleanup where operation_id = 777 ORDER BY object_name; operation_id | object_type | object_name | node_group_id | policy_type --------------------------------------------------------------------- - 777 | 1 | citus_failure_split_cleanup_schema.table_to_split_8981003 | 2 | 1 777 | 1 | citus_failure_split_cleanup_schema.table_to_split_8981002 | 2 | 0 + 777 | 1 | citus_failure_split_cleanup_schema.table_to_split_8981003 | 2 | 1 777 | 4 | citus_shard_split_publication_xxxxxxx_xxxxxxx | 2 | 0 777 | 4 | citus_shard_split_publication_xxxxxxx_xxxxxxx | 2 | 0 777 | 3 | citus_shard_split_slot_xxxxxxx_xxxxxxx | 2 | 0 @@ -319,7 +319,7 @@ CONTEXT: while executing command on localhost:xxxxx CALL pg_catalog.citus_cleanup_orphaned_resources(); NOTICE: cleaned up 5 orphaned resources SELECT operation_id, object_type, object_name, node_group_id, policy_type - FROM pg_dist_cleanup where operation_id = 777; + FROM pg_dist_cleanup where operation_id = 777 ORDER BY object_name; operation_id | object_type | object_name | node_group_id | policy_type --------------------------------------------------------------------- (0 rows) @@ -378,17 +378,17 @@ WARNING: failed to clean up 2 orphaned shards out of 12 after a citus_split_sha ERROR: connection not open CONTEXT: while executing command on localhost:xxxxx SELECT operation_id, object_type, object_name, node_group_id, policy_type - FROM pg_dist_cleanup where operation_id = 777; + FROM pg_dist_cleanup where operation_id = 777 ORDER BY object_name; operation_id | object_type | object_name | node_group_id | policy_type --------------------------------------------------------------------- - 777 | 1 | citus_failure_split_cleanup_schema.table_to_split_8981003 | 2 | 1 777 | 1 | citus_failure_split_cleanup_schema.table_to_split_8981002 | 2 | 0 + 777 | 1 | citus_failure_split_cleanup_schema.table_to_split_8981003 | 2 | 1 777 | 4 | citus_shard_split_publication_xxxxxxx_xxxxxxx | 2 | 0 777 | 4 | citus_shard_split_publication_xxxxxxx_xxxxxxx | 2 | 0 777 | 3 | citus_shard_split_slot_xxxxxxx_xxxxxxx | 2 | 0 777 | 3 | citus_shard_split_slot_xxxxxxx_xxxxxxx | 2 | 0 - 777 | 5 | citus_shard_split_subscription_role_10 | 2 | 0 777 | 2 | citus_shard_split_subscription_xxxxxxx | 2 | 0 + 777 | 5 | citus_shard_split_subscription_role_10 | 2 | 0 (8 rows) -- we need to allow connection so that we can connect to proxy @@ -437,7 +437,7 @@ CONTEXT: while executing command on localhost:xxxxx CALL pg_catalog.citus_cleanup_orphaned_resources(); NOTICE: cleaned up 8 orphaned resources SELECT operation_id, object_type, object_name, node_group_id, policy_type - FROM pg_dist_cleanup where operation_id = 777; + FROM pg_dist_cleanup where operation_id = 777 ORDER BY object_name; operation_id | object_type | object_name | node_group_id | policy_type --------------------------------------------------------------------- (0 rows) @@ -496,17 +496,17 @@ WARNING: failed to clean up 2 orphaned shards out of 12 after a citus_split_sha ERROR: connection not open CONTEXT: while executing command on localhost:xxxxx SELECT operation_id, object_type, object_name, node_group_id, policy_type - FROM pg_dist_cleanup where operation_id = 777; + FROM pg_dist_cleanup where operation_id = 777 ORDER BY object_name; operation_id | object_type | object_name | node_group_id | policy_type --------------------------------------------------------------------- - 777 | 1 | citus_failure_split_cleanup_schema.table_to_split_8981003 | 2 | 1 777 | 1 | citus_failure_split_cleanup_schema.table_to_split_8981002 | 2 | 0 + 777 | 1 | citus_failure_split_cleanup_schema.table_to_split_8981003 | 2 | 1 777 | 4 | citus_shard_split_publication_xxxxxxx_xxxxxxx | 2 | 0 777 | 4 | citus_shard_split_publication_xxxxxxx_xxxxxxx | 2 | 0 777 | 3 | citus_shard_split_slot_xxxxxxx_xxxxxxx | 2 | 0 777 | 3 | citus_shard_split_slot_xxxxxxx_xxxxxxx | 2 | 0 - 777 | 5 | citus_shard_split_subscription_role_10 | 2 | 0 777 | 2 | citus_shard_split_subscription_xxxxxxx | 2 | 0 + 777 | 5 | citus_shard_split_subscription_role_10 | 2 | 0 (8 rows) -- we need to allow connection so that we can connect to proxy @@ -555,7 +555,7 @@ CONTEXT: while executing command on localhost:xxxxx CALL pg_catalog.citus_cleanup_orphaned_resources(); NOTICE: cleaned up 8 orphaned resources SELECT operation_id, object_type, object_name, node_group_id, policy_type - FROM pg_dist_cleanup where operation_id = 777; + FROM pg_dist_cleanup where operation_id = 777 ORDER BY object_name; operation_id | object_type | object_name | node_group_id | policy_type --------------------------------------------------------------------- (0 rows) @@ -615,7 +615,7 @@ WARNING: connection to the remote node localhost:xxxxx failed with the followin ERROR: connection not open CONTEXT: while executing command on localhost:xxxxx SELECT operation_id, object_type, object_name, node_group_id, policy_type - FROM pg_dist_cleanup where operation_id = 777; + FROM pg_dist_cleanup where operation_id = 777 ORDER BY object_name; operation_id | object_type | object_name | node_group_id | policy_type --------------------------------------------------------------------- 777 | 1 | citus_failure_split_cleanup_schema.table_to_split_8981002 | 1 | 1 @@ -624,8 +624,8 @@ CONTEXT: while executing command on localhost:xxxxx 777 | 4 | citus_shard_split_publication_xxxxxxx_xxxxxxx | 2 | 0 777 | 3 | citus_shard_split_slot_xxxxxxx_xxxxxxx | 2 | 0 777 | 3 | citus_shard_split_slot_xxxxxxx_xxxxxxx | 2 | 0 - 777 | 5 | citus_shard_split_subscription_role_10 | 2 | 0 777 | 2 | citus_shard_split_subscription_xxxxxxx | 2 | 0 + 777 | 5 | citus_shard_split_subscription_role_10 | 2 | 0 (8 rows) SELECT relname FROM pg_class where relname LIKE '%table_to_split_%' AND relkind = 'r' order by relname; @@ -678,7 +678,7 @@ CONTEXT: while executing command on localhost:xxxxx CALL pg_catalog.citus_cleanup_orphaned_resources(); NOTICE: cleaned up 8 orphaned resources SELECT operation_id, object_type, object_name, node_group_id, policy_type - FROM pg_dist_cleanup where operation_id = 777; + FROM pg_dist_cleanup where operation_id = 777 ORDER BY object_name; operation_id | object_type | object_name | node_group_id | policy_type --------------------------------------------------------------------- (0 rows) diff --git a/src/test/regress/expected/issue_5763.out b/src/test/regress/expected/issue_5763.out new file mode 100644 index 000000000..aa6c4f35b --- /dev/null +++ b/src/test/regress/expected/issue_5763.out @@ -0,0 +1,38 @@ +-- +-- ISSUE_5763 +-- +-- Issue: DROP OWNED BY fails to drop the schemas on the workers +-- Link: https://github.com/citusdata/citus/issues/5763 +-- +CREATE USER issue_5763_1 WITH SUPERUSER; +CREATE USER issue_5763_2 WITH SUPERUSER; +\c - issue_5763_1 - :master_port +CREATE SCHEMA issue_5763_sc_1; +\c - issue_5763_2 - :master_port +CREATE SCHEMA issue_5763_sc_2; +\c - postgres - :master_port +DROP OWNED BY issue_5763_1, issue_5763_2; +\c - issue_5763_1 - :master_port +CREATE SCHEMA issue_5763_sc_1; +\c - postgres - :master_port +DROP SCHEMA issue_5763_sc_1; +DROP USER issue_5763_1, issue_5763_2; +-- test CASCADE options +CREATE USER issue_5763_3 WITH SUPERUSER; +\c - issue_5763_3 - :master_port +CREATE SCHEMA issue_5763_sc_3; +CREATE TABLE issue_5763_sc_3.tb1(id int); +\c - postgres - :master_port +DROP OWNED BY issue_5763_3 CASCADE; +DROP USER issue_5763_3; +-- test non-distributed role +SET citus.enable_create_role_propagation TO off; +CREATE USER issue_5763_4 WITH SUPERUSER; +NOTICE: not propagating CREATE ROLE/USER commands to worker nodes +HINT: Connect to worker nodes directly to manually create all necessary users and roles. +\c - issue_5763_4 - :master_port +set citus.enable_ddl_propagation = off; +CREATE SCHEMA issue_5763_sc_4; +\c - postgres - :master_port +DROP OWNED BY issue_5763_4 RESTRICT; +DROP USER issue_5763_4; diff --git a/src/test/regress/expected/multi_multiuser.out b/src/test/regress/expected/multi_multiuser.out index b8146bf5a..d6216a961 100644 --- a/src/test/regress/expected/multi_multiuser.out +++ b/src/test/regress/expected/multi_multiuser.out @@ -545,27 +545,6 @@ DROP TABLE test, test_coloc, colocation_table; -SELECT run_command_on_workers($$DROP OWNED BY full_access$$); - run_command_on_workers ---------------------------------------------------------------------- - (localhost,57637,t,"DROP OWNED") - (localhost,57638,t,"DROP OWNED") -(2 rows) - -SELECT run_command_on_workers($$DROP OWNED BY some_role$$); - run_command_on_workers ---------------------------------------------------------------------- - (localhost,57637,t,"DROP OWNED") - (localhost,57638,t,"DROP OWNED") -(2 rows) - -SELECT run_command_on_workers($$DROP OWNED BY read_access$$); - run_command_on_workers ---------------------------------------------------------------------- - (localhost,57637,t,"DROP OWNED") - (localhost,57638,t,"DROP OWNED") -(2 rows) - DROP USER full_access; DROP USER read_access; DROP USER no_access; diff --git a/src/test/regress/expected/multi_schema_support.out b/src/test/regress/expected/multi_schema_support.out index 890663db4..31f0c3e28 100644 --- a/src/test/regress/expected/multi_schema_support.out +++ b/src/test/regress/expected/multi_schema_support.out @@ -1154,13 +1154,6 @@ SET citus.next_shard_id TO 1197000; -- we do not use run_command_on_coordinator_and_workers here because when there is CASCADE, it causes deadlock DROP OWNED BY "test-user" CASCADE; NOTICE: drop cascades to table schema_with_user.test_table -SELECT run_command_on_workers('DROP OWNED BY "test-user" CASCADE'); - run_command_on_workers ---------------------------------------------------------------------- - (localhost,57637,t,"DROP OWNED") - (localhost,57638,t,"DROP OWNED") -(2 rows) - DROP USER "test-user"; DROP FUNCTION run_command_on_coordinator_and_workers(p_sql text); -- test run_command_on_* UDFs with schema diff --git a/src/test/regress/expected/pg15.out b/src/test/regress/expected/pg15.out index be14f70a7..5dcd2c04d 100644 --- a/src/test/regress/expected/pg15.out +++ b/src/test/regress/expected/pg15.out @@ -1456,6 +1456,36 @@ NOTICE: drop cascades to 2 other objects CREATE DATABASE db_with_oid OID 987654; NOTICE: Citus partially supports CREATE DATABASE for distributed databases DROP DATABASE db_with_oid; +-- SET ACCESS METHOD +-- Create a heap2 table am handler with heapam handler +CREATE ACCESS METHOD heap2 TYPE TABLE HANDLER heap_tableam_handler; +SELECT run_command_on_workers($$CREATE ACCESS METHOD heap2 TYPE TABLE HANDLER heap_tableam_handler$$); + run_command_on_workers +--------------------------------------------------------------------- + (localhost,57637,t,"CREATE ACCESS METHOD") + (localhost,57638,t,"CREATE ACCESS METHOD") +(2 rows) + +CREATE TABLE mx_ddl_table2 ( + key int primary key, + value int +); +SELECT create_distributed_table('mx_ddl_table2', 'key', 'hash', shard_count=> 4); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +ALTER TABLE mx_ddl_table2 SET ACCESS METHOD heap2; +DROP TABLE mx_ddl_table2; +DROP ACCESS METHOD heap2; +SELECT run_command_on_workers($$DROP ACCESS METHOD heap2$$); + run_command_on_workers +--------------------------------------------------------------------- + (localhost,57637,t,"DROP ACCESS METHOD") + (localhost,57638,t,"DROP ACCESS METHOD") +(2 rows) + -- Clean up \set VERBOSITY terse SET client_min_messages TO ERROR; diff --git a/src/test/regress/expected/single_node_enterprise.out b/src/test/regress/expected/single_node_enterprise.out index 55eee2124..e6155ba4e 100644 --- a/src/test/regress/expected/single_node_enterprise.out +++ b/src/test/regress/expected/single_node_enterprise.out @@ -489,18 +489,6 @@ SET client_min_messages TO WARNING; DROP SCHEMA single_node_ent CASCADE; DROP OWNED BY full_access_single_node; DROP OWNED BY read_access_single_node; -SELECT run_command_on_workers($$DROP OWNED BY full_access_single_node$$); - run_command_on_workers ---------------------------------------------------------------------- - (localhost,57637,t,"DROP OWNED") -(1 row) - -SELECT run_command_on_workers($$DROP OWNED BY read_access_single_node$$); - run_command_on_workers ---------------------------------------------------------------------- - (localhost,57637,t,"DROP OWNED") -(1 row) - DROP ROLE full_access_single_node; DROP ROLE read_access_single_node; -- remove the nodes for next tests diff --git a/src/test/regress/multi_schedule b/src/test/regress/multi_schedule index 1de193e1f..68b3e9cf7 100644 --- a/src/test/regress/multi_schedule +++ b/src/test/regress/multi_schedule @@ -95,7 +95,7 @@ test: multi_dropped_column_aliases foreign_key_restriction_enforcement test: binary_protocol test: alter_table_set_access_method test: alter_distributed_table -test: issue_5248 issue_5099 +test: issue_5248 issue_5099 issue_5763 test: object_propagation_debug test: undistribute_table test: run_command_on_all_nodes @@ -113,3 +113,4 @@ test: ensure_no_intermediate_data_leak # -------- test: ensure_no_shared_connection_leak test: check_mx + diff --git a/src/test/regress/sql/background_task_queue_monitor.sql b/src/test/regress/sql/background_task_queue_monitor.sql index 4319f2bf8..c04fe90d6 100644 --- a/src/test/regress/sql/background_task_queue_monitor.sql +++ b/src/test/regress/sql/background_task_queue_monitor.sql @@ -54,6 +54,7 @@ SELECT status, pid, retry_count, NOT(message = '') AS has_message, (not_before > -- test cancelling a failed/retrying job SELECT citus_job_cancel(:job_id); +SELECT citus_job_wait(:job_id); SELECT state, NOT(started_at IS NULL) AS did_start FROM pg_dist_background_job WHERE job_id = :job_id; SELECT status, NOT(message = '') AS did_start FROM pg_dist_background_task WHERE job_id = :job_id ORDER BY task_id ASC; @@ -110,6 +111,7 @@ SELECT job_id, task_id, status FROM pg_dist_background_task ORDER BY job_id, task_id; -- show that last task is not running but ready to run(runnable) SELECT citus_job_cancel(:job_id2); -- when a job with 1 task is cancelled, the last runnable task will be running +SELECT citus_job_wait(:job_id2); -- wait for the job to be cancelled SELECT citus_job_wait(:job_id3, desired_status => 'running'); SELECT job_id, task_id, status FROM pg_dist_background_task WHERE task_id IN (:task_id1, :task_id2, :task_id3, :task_id4, :task_id5) @@ -118,18 +120,115 @@ SELECT job_id, task_id, status FROM pg_dist_background_task SELECT citus_job_cancel(:job_id1); SELECT citus_job_cancel(:job_id3); SELECT citus_job_wait(:job_id1); -SELECT citus_job_wait(:job_id2); SELECT citus_job_wait(:job_id3); SELECT job_id, task_id, status FROM pg_dist_background_task WHERE task_id IN (:task_id1, :task_id2, :task_id3, :task_id4, :task_id5) ORDER BY job_id, task_id; -- show that multiple cancels worked --- verify that task is not starved by currently long running task +-- verify that a task, previously not started due to lack of workers, is executed after we increase max worker count BEGIN; INSERT INTO pg_dist_background_job (job_type, description) VALUES ('test_job', 'simple test to verify max parallel background execution') RETURNING job_id AS job_id1 \gset -INSERT INTO pg_dist_background_task (job_id, command) VALUES (:job_id1, $job$ SELECT pg_sleep(5000); $job$) RETURNING task_id AS task_id1 \gset +INSERT INTO pg_dist_background_task (job_id, command) VALUES (:job_id1, $job$ SELECT pg_sleep(5); $job$) RETURNING task_id AS task_id1 \gset +INSERT INTO pg_dist_background_task (job_id, command) VALUES (:job_id1, $job$ SELECT pg_sleep(5); $job$) RETURNING task_id AS task_id2 \gset +INSERT INTO pg_dist_background_task (job_id, command) VALUES (:job_id1, $job$ SELECT pg_sleep(5); $job$) RETURNING task_id AS task_id3 \gset INSERT INTO pg_dist_background_job (job_type, description) VALUES ('test_job', 'simple test to verify max parallel background execution') RETURNING job_id AS job_id2 \gset +INSERT INTO pg_dist_background_task (job_id, command) VALUES (:job_id2, $job$ SELECT pg_sleep(5); $job$) RETURNING task_id AS task_id4 \gset +INSERT INTO pg_dist_background_job (job_type, description) VALUES ('test_job', 'simple test to verify max parallel background execution') RETURNING job_id AS job_id3 \gset +INSERT INTO pg_dist_background_task (job_id, command) VALUES (:job_id3, $job$ SELECT pg_sleep(5); $job$) RETURNING task_id AS task_id5 \gset +COMMIT; + +SELECT pg_sleep(2); -- we assume this is enough time for all tasks to be in running status except the last one due to parallel worker limit + +SELECT task_id, status FROM pg_dist_background_task + WHERE task_id IN (:task_id1, :task_id2, :task_id3, :task_id4, :task_id5) + ORDER BY task_id; -- show that last task is not running but ready to run(runnable) + +ALTER SYSTEM SET citus.max_background_task_executors TO 5; +SELECT pg_reload_conf(); -- the last runnable task will be running after change +SELECT citus_job_wait(:job_id3, desired_status => 'running'); +SELECT task_id, status FROM pg_dist_background_task + WHERE task_id IN (:task_id1, :task_id2, :task_id3, :task_id4, :task_id5) + ORDER BY task_id; -- show that last task is running + +SELECT citus_job_cancel(:job_id1); +SELECT citus_job_cancel(:job_id2); +SELECT citus_job_cancel(:job_id3); +SELECT citus_job_wait(:job_id1); +SELECT citus_job_wait(:job_id2); +SELECT citus_job_wait(:job_id3); + +SELECT task_id, status FROM pg_dist_background_task + WHERE task_id IN (:task_id1, :task_id2, :task_id3, :task_id4, :task_id5) + ORDER BY task_id; -- show that all tasks are cancelled + +-- verify that upon termination signal, all tasks fail and retry policy sets their status back to runnable +BEGIN; +INSERT INTO pg_dist_background_job (job_type, description) VALUES ('test_job', 'simple test to verify termination on monitor') RETURNING job_id AS job_id1 \gset +INSERT INTO pg_dist_background_task (job_id, command) VALUES (:job_id1, $job$ SELECT pg_sleep(500); $job$) RETURNING task_id AS task_id1 \gset +INSERT INTO pg_dist_background_job (job_type, description) VALUES ('test_job', 'simple test to verify termination on monitor') RETURNING job_id AS job_id2 \gset +INSERT INTO pg_dist_background_task (job_id, command) VALUES (:job_id2, $job$ SELECT pg_sleep(500); $job$) RETURNING task_id AS task_id2 \gset +COMMIT; + +SELECT citus_job_wait(:job_id1, desired_status => 'running'); +SELECT citus_job_wait(:job_id2, desired_status => 'running'); + +SELECT task_id, status FROM pg_dist_background_task + WHERE task_id IN (:task_id1, :task_id2) + ORDER BY task_id; + + +SELECT pid AS monitor_pid FROM pg_stat_activity WHERE application_name ~ 'task queue monitor' \gset +SELECT pg_terminate_backend(:monitor_pid); -- terminate monitor process + +SELECT pg_sleep(2); -- wait enough to show that tasks are terminated + +SELECT task_id, status, retry_count, message FROM pg_dist_background_task + WHERE task_id IN (:task_id1, :task_id2) + ORDER BY task_id; -- show that all tasks are runnable by retry policy after termination signal + +SELECT citus_job_cancel(:job_id1); +SELECT citus_job_cancel(:job_id2); +SELECT citus_job_wait(:job_id1); +SELECT citus_job_wait(:job_id2); + +SELECT task_id, status FROM pg_dist_background_task + WHERE task_id IN (:task_id1, :task_id2) + ORDER BY task_id; -- show that all tasks are cancelled + +-- verify that upon cancellation signal, all tasks are cancelled +BEGIN; +INSERT INTO pg_dist_background_job (job_type, description) VALUES ('test_job', 'simple test to verify cancellation on monitor') RETURNING job_id AS job_id1 \gset +INSERT INTO pg_dist_background_task (job_id, command) VALUES (:job_id1, $job$ SELECT pg_sleep(500); $job$) RETURNING task_id AS task_id1 \gset +INSERT INTO pg_dist_background_job (job_type, description) VALUES ('test_job', 'simple test to verify cancellation on monitor') RETURNING job_id AS job_id2 \gset +INSERT INTO pg_dist_background_task (job_id, command) VALUES (:job_id2, $job$ SELECT pg_sleep(500); $job$) RETURNING task_id AS task_id2 \gset +COMMIT; + +SELECT citus_job_wait(:job_id1, desired_status => 'running'); +SELECT citus_job_wait(:job_id2, desired_status => 'running'); + +SELECT task_id, status FROM pg_dist_background_task + WHERE task_id IN (:task_id1, :task_id2) + ORDER BY task_id; + + +SELECT pid AS monitor_pid FROM pg_stat_activity WHERE application_name ~ 'task queue monitor' \gset +SELECT pg_cancel_backend(:monitor_pid); -- cancel monitor process + +SELECT pg_sleep(2); -- wait enough to show that tasks are cancelled + +SELECT citus_job_wait(:job_id1); +SELECT citus_job_wait(:job_id2); + +SELECT task_id, status FROM pg_dist_background_task + WHERE task_id IN (:task_id1, :task_id2) + ORDER BY task_id; -- show that all tasks are cancelled + +-- verify that task is not starved by currently long running task +BEGIN; +INSERT INTO pg_dist_background_job (job_type, description) VALUES ('test_job', 'simple test to verify task execution starvation') RETURNING job_id AS job_id1 \gset +INSERT INTO pg_dist_background_task (job_id, command) VALUES (:job_id1, $job$ SELECT pg_sleep(5000); $job$) RETURNING task_id AS task_id1 \gset +INSERT INTO pg_dist_background_job (job_type, description) VALUES ('test_job', 'simple test to verify task execution starvation') RETURNING job_id AS job_id2 \gset INSERT INTO pg_dist_background_task (job_id, command) VALUES (:job_id2, $job$ SELECT 1; $job$) RETURNING task_id AS task_id2 \gset COMMIT; @@ -139,6 +238,11 @@ SELECT job_id, task_id, status FROM pg_dist_background_task WHERE task_id IN (:task_id1, :task_id2) ORDER BY job_id, task_id; -- show that last task is finished without starvation SELECT citus_job_cancel(:job_id1); +SELECT citus_job_wait(:job_id1); + +SELECT job_id, task_id, status FROM pg_dist_background_task + WHERE task_id IN (:task_id1, :task_id2) + ORDER BY job_id, task_id; -- show that task is cancelled SET client_min_messages TO WARNING; DROP SCHEMA background_task_queue_monitor CASCADE; diff --git a/src/test/regress/sql/citus_non_blocking_split_columnar.sql b/src/test/regress/sql/citus_non_blocking_split_columnar.sql index 33a622968..21639f19a 100644 --- a/src/test/regress/sql/citus_non_blocking_split_columnar.sql +++ b/src/test/regress/sql/citus_non_blocking_split_columnar.sql @@ -244,7 +244,9 @@ CALL pg_catalog.citus_cleanup_orphaned_resources(); -- END: Split a partition table directly -- BEGIN: Perform deferred cleanup. +SET client_min_messages TO WARNING; CALL pg_catalog.citus_cleanup_orphaned_resources(); +RESET client_min_messages; -- END: Perform deferred cleanup. -- BEGIN: Validate Shard Info and Data diff --git a/src/test/regress/sql/citus_non_blocking_split_shard_cleanup.sql b/src/test/regress/sql/citus_non_blocking_split_shard_cleanup.sql index 5fda60d62..dab69dfbc 100644 --- a/src/test/regress/sql/citus_non_blocking_split_shard_cleanup.sql +++ b/src/test/regress/sql/citus_non_blocking_split_shard_cleanup.sql @@ -46,6 +46,11 @@ SELECT pg_catalog.citus_split_shard_by_split_points( ARRAY[:worker_2_node, :worker_2_node], 'force_logical'); +-- BEGIN: Perform deferred cleanup. +SET client_min_messages TO WARNING; +CALL pg_catalog.citus_cleanup_orphaned_resources(); +RESET client_min_messages; +-- END: Perform deferred cleanup. \c - - - :worker_1_port SET search_path TO "citus_split_test_schema"; diff --git a/src/test/regress/sql/citus_non_blocking_split_shards.sql b/src/test/regress/sql/citus_non_blocking_split_shards.sql index a387b07c5..2109a6902 100644 --- a/src/test/regress/sql/citus_non_blocking_split_shards.sql +++ b/src/test/regress/sql/citus_non_blocking_split_shards.sql @@ -149,7 +149,9 @@ SELECT pg_catalog.citus_split_shard_by_split_points( 'force_logical'); -- BEGIN: Perform deferred cleanup. +SET client_min_messages TO WARNING; CALL pg_catalog.citus_cleanup_orphaned_resources(); +RESET client_min_messages; -- END: Perform deferred cleanup. -- Perform 3 way split @@ -161,7 +163,9 @@ SELECT pg_catalog.citus_split_shard_by_split_points( -- END : Split two shards : One with move and One without move. -- BEGIN: Perform deferred cleanup. +SET client_min_messages TO WARNING; CALL pg_catalog.citus_cleanup_orphaned_resources(); +RESET client_min_messages; -- END: Perform deferred cleanup. -- BEGIN : Move a shard post split. @@ -263,7 +267,9 @@ SELECT pg_catalog.citus_split_shard_by_split_points( ARRAY[:worker_1_node, :worker_2_node]); -- BEGIN: Perform deferred cleanup. +SET client_min_messages TO WARNING; CALL pg_catalog.citus_cleanup_orphaned_resources(); +RESET client_min_messages; -- END: Perform deferred cleanup. SELECT shard.shardid, logicalrelid, shardminvalue, shardmaxvalue, nodename, nodeport @@ -288,7 +294,9 @@ SELECT pg_catalog.citus_split_shard_by_split_points( 'auto'); -- BEGIN: Perform deferred cleanup. +SET client_min_messages TO WARNING; CALL pg_catalog.citus_cleanup_orphaned_resources(); +RESET client_min_messages; -- END: Perform deferred cleanup. SELECT shard.shardid, logicalrelid, shardminvalue, shardmaxvalue, nodename, nodeport @@ -308,6 +316,11 @@ SELECT COUNT(*) FROM colocated_dist_table; --BEGIN : Cleanup \c - postgres - :master_port +-- make sure we don't have any replication objects leftover on the workers +SELECT run_command_on_workers($$SELECT count(*) FROM pg_replication_slots$$); +SELECT run_command_on_workers($$SELECT count(*) FROM pg_publication$$); +SELECT run_command_on_workers($$SELECT count(*) FROM pg_subscription$$); + ALTER SYSTEM RESET citus.defer_shard_delete_interval; SELECT pg_reload_conf(); DROP SCHEMA "citus_split_test_schema" CASCADE; diff --git a/src/test/regress/sql/failure_online_move_shard_placement.sql b/src/test/regress/sql/failure_online_move_shard_placement.sql index ce33ce6e0..31b51f1e7 100644 --- a/src/test/regress/sql/failure_online_move_shard_placement.sql +++ b/src/test/regress/sql/failure_online_move_shard_placement.sql @@ -134,7 +134,9 @@ SELECT citus.mitmproxy('conn.allow()'); SELECT run_command_on_workers($$DROP SUBSCRIPTION IF EXISTS citus_shard_move_subscription_10$$); -- cleanup leftovers -- verify we don't see any error for already dropped subscription +SET client_min_messages TO WARNING; CALL citus_cleanup_orphaned_resources(); +RESET client_min_messages; -- cancellation on dropping subscription SELECT citus.mitmproxy('conn.onQuery(query="^DROP SUBSCRIPTION").cancel(' || :pid || ')'); diff --git a/src/test/regress/sql/failure_split_cleanup.sql b/src/test/regress/sql/failure_split_cleanup.sql index 03b6cb45c..597a5f6bd 100644 --- a/src/test/regress/sql/failure_split_cleanup.sql +++ b/src/test/regress/sql/failure_split_cleanup.sql @@ -40,7 +40,7 @@ SELECT create_distributed_table('table_to_split', 'id'); ARRAY[:worker_1_node, :worker_2_node], 'force_logical'); SELECT operation_id, object_type, object_name, node_group_id, policy_type - FROM pg_dist_cleanup where operation_id = 777; + FROM pg_dist_cleanup where operation_id = 777 ORDER BY object_name; -- we need to allow connection so that we can connect to proxy SELECT citus.mitmproxy('conn.allow()'); @@ -60,7 +60,7 @@ SELECT create_distributed_table('table_to_split', 'id'); \c - postgres - :master_port CALL pg_catalog.citus_cleanup_orphaned_resources(); SELECT operation_id, object_type, object_name, node_group_id, policy_type - FROM pg_dist_cleanup where operation_id = 777; + FROM pg_dist_cleanup where operation_id = 777 ORDER BY object_name; \c - - - :worker_2_proxy_port SET search_path TO "citus_failure_split_cleanup_schema", public, pg_catalog; @@ -92,7 +92,7 @@ SELECT create_distributed_table('table_to_split', 'id'); RESET client_min_messages; SELECT operation_id, object_type, object_name, node_group_id, policy_type - FROM pg_dist_cleanup where operation_id = 777; + FROM pg_dist_cleanup where operation_id = 777 ORDER BY object_name; -- we need to allow connection so that we can connect to proxy SELECT citus.mitmproxy('conn.allow()'); @@ -111,7 +111,7 @@ SELECT create_distributed_table('table_to_split', 'id'); \c - postgres - :master_port CALL pg_catalog.citus_cleanup_orphaned_resources(); SELECT operation_id, object_type, object_name, node_group_id, policy_type - FROM pg_dist_cleanup where operation_id = 777; + FROM pg_dist_cleanup where operation_id = 777 ORDER BY object_name; \c - - - :worker_2_proxy_port SET search_path TO "citus_failure_split_cleanup_schema", public, pg_catalog; @@ -138,7 +138,7 @@ SELECT create_distributed_table('table_to_split', 'id'); ARRAY[:worker_1_node, :worker_2_node], 'force_logical'); SELECT operation_id, object_type, object_name, node_group_id, policy_type - FROM pg_dist_cleanup where operation_id = 777; + FROM pg_dist_cleanup where operation_id = 777 ORDER BY object_name; -- we need to allow connection so that we can connect to proxy SELECT citus.mitmproxy('conn.allow()'); @@ -157,7 +157,7 @@ SELECT create_distributed_table('table_to_split', 'id'); \c - postgres - :master_port CALL pg_catalog.citus_cleanup_orphaned_resources(); SELECT operation_id, object_type, object_name, node_group_id, policy_type - FROM pg_dist_cleanup where operation_id = 777; + FROM pg_dist_cleanup where operation_id = 777 ORDER BY object_name; \c - - - :worker_2_proxy_port SET search_path TO "citus_failure_split_cleanup_schema", public, pg_catalog; @@ -184,7 +184,7 @@ SELECT create_distributed_table('table_to_split', 'id'); ARRAY[:worker_1_node, :worker_2_node], 'force_logical'); SELECT operation_id, object_type, object_name, node_group_id, policy_type - FROM pg_dist_cleanup where operation_id = 777; + FROM pg_dist_cleanup where operation_id = 777 ORDER BY object_name; -- we need to allow connection so that we can connect to proxy SELECT citus.mitmproxy('conn.allow()'); @@ -203,7 +203,7 @@ SELECT create_distributed_table('table_to_split', 'id'); \c - postgres - :master_port CALL pg_catalog.citus_cleanup_orphaned_resources(); SELECT operation_id, object_type, object_name, node_group_id, policy_type - FROM pg_dist_cleanup where operation_id = 777; + FROM pg_dist_cleanup where operation_id = 777 ORDER BY object_name; \c - - - :worker_2_proxy_port SET search_path TO "citus_failure_split_cleanup_schema", public, pg_catalog; @@ -230,7 +230,7 @@ SELECT create_distributed_table('table_to_split', 'id'); ARRAY[:worker_1_node, :worker_2_node], 'force_logical'); SELECT operation_id, object_type, object_name, node_group_id, policy_type - FROM pg_dist_cleanup where operation_id = 777; + FROM pg_dist_cleanup where operation_id = 777 ORDER BY object_name; -- we need to allow connection so that we can connect to proxy SELECT citus.mitmproxy('conn.allow()'); @@ -249,7 +249,7 @@ SELECT create_distributed_table('table_to_split', 'id'); \c - postgres - :master_port CALL pg_catalog.citus_cleanup_orphaned_resources(); SELECT operation_id, object_type, object_name, node_group_id, policy_type - FROM pg_dist_cleanup where operation_id = 777; + FROM pg_dist_cleanup where operation_id = 777 ORDER BY object_name; \c - - - :worker_2_proxy_port SET search_path TO "citus_failure_split_cleanup_schema", public, pg_catalog; @@ -277,7 +277,7 @@ SELECT create_distributed_table('table_to_split', 'id'); 'force_logical'); SELECT operation_id, object_type, object_name, node_group_id, policy_type - FROM pg_dist_cleanup where operation_id = 777; + FROM pg_dist_cleanup where operation_id = 777 ORDER BY object_name; SELECT relname FROM pg_class where relname LIKE '%table_to_split_%' AND relkind = 'r' order by relname; -- we need to allow connection so that we can connect to proxy SELECT citus.mitmproxy('conn.allow()'); @@ -297,7 +297,7 @@ SELECT create_distributed_table('table_to_split', 'id'); \c - postgres - :master_port CALL pg_catalog.citus_cleanup_orphaned_resources(); SELECT operation_id, object_type, object_name, node_group_id, policy_type - FROM pg_dist_cleanup where operation_id = 777; + FROM pg_dist_cleanup where operation_id = 777 ORDER BY object_name; \c - - - :worker_2_proxy_port SET search_path TO "citus_failure_split_cleanup_schema", public, pg_catalog; diff --git a/src/test/regress/sql/issue_5763.sql b/src/test/regress/sql/issue_5763.sql new file mode 100644 index 000000000..c8c146ec2 --- /dev/null +++ b/src/test/regress/sql/issue_5763.sql @@ -0,0 +1,50 @@ +-- +-- ISSUE_5763 +-- +-- Issue: DROP OWNED BY fails to drop the schemas on the workers +-- Link: https://github.com/citusdata/citus/issues/5763 +-- + +CREATE USER issue_5763_1 WITH SUPERUSER; +CREATE USER issue_5763_2 WITH SUPERUSER; + +\c - issue_5763_1 - :master_port +CREATE SCHEMA issue_5763_sc_1; + +\c - issue_5763_2 - :master_port +CREATE SCHEMA issue_5763_sc_2; + +\c - postgres - :master_port +DROP OWNED BY issue_5763_1, issue_5763_2; + +\c - issue_5763_1 - :master_port +CREATE SCHEMA issue_5763_sc_1; + +\c - postgres - :master_port +DROP SCHEMA issue_5763_sc_1; +DROP USER issue_5763_1, issue_5763_2; + +-- test CASCADE options +CREATE USER issue_5763_3 WITH SUPERUSER; + +\c - issue_5763_3 - :master_port +CREATE SCHEMA issue_5763_sc_3; +CREATE TABLE issue_5763_sc_3.tb1(id int); + +\c - postgres - :master_port +DROP OWNED BY issue_5763_3 CASCADE; + +DROP USER issue_5763_3; + +-- test non-distributed role +SET citus.enable_create_role_propagation TO off; +CREATE USER issue_5763_4 WITH SUPERUSER; + +\c - issue_5763_4 - :master_port +set citus.enable_ddl_propagation = off; +CREATE SCHEMA issue_5763_sc_4; + +\c - postgres - :master_port +DROP OWNED BY issue_5763_4 RESTRICT; + +DROP USER issue_5763_4; diff --git a/src/test/regress/sql/multi_multiuser.sql b/src/test/regress/sql/multi_multiuser.sql index ef9700413..150abe307 100644 --- a/src/test/regress/sql/multi_multiuser.sql +++ b/src/test/regress/sql/multi_multiuser.sql @@ -328,9 +328,6 @@ DROP TABLE test, test_coloc, colocation_table; -SELECT run_command_on_workers($$DROP OWNED BY full_access$$); -SELECT run_command_on_workers($$DROP OWNED BY some_role$$); -SELECT run_command_on_workers($$DROP OWNED BY read_access$$); DROP USER full_access; DROP USER read_access; DROP USER no_access; diff --git a/src/test/regress/sql/multi_schema_support.sql b/src/test/regress/sql/multi_schema_support.sql index 01873659f..944667c2a 100644 --- a/src/test/regress/sql/multi_schema_support.sql +++ b/src/test/regress/sql/multi_schema_support.sql @@ -842,7 +842,6 @@ SET citus.next_shard_id TO 1197000; -- we do not use run_command_on_coordinator_and_workers here because when there is CASCADE, it causes deadlock DROP OWNED BY "test-user" CASCADE; -SELECT run_command_on_workers('DROP OWNED BY "test-user" CASCADE'); DROP USER "test-user"; DROP FUNCTION run_command_on_coordinator_and_workers(p_sql text); diff --git a/src/test/regress/sql/pg15.sql b/src/test/regress/sql/pg15.sql index 04dc46910..c334671bc 100644 --- a/src/test/regress/sql/pg15.sql +++ b/src/test/regress/sql/pg15.sql @@ -924,6 +924,21 @@ DROP SERVER foreign_server CASCADE; CREATE DATABASE db_with_oid OID 987654; DROP DATABASE db_with_oid; +-- SET ACCESS METHOD +-- Create a heap2 table am handler with heapam handler +CREATE ACCESS METHOD heap2 TYPE TABLE HANDLER heap_tableam_handler; +SELECT run_command_on_workers($$CREATE ACCESS METHOD heap2 TYPE TABLE HANDLER heap_tableam_handler$$); +CREATE TABLE mx_ddl_table2 ( + key int primary key, + value int +); +SELECT create_distributed_table('mx_ddl_table2', 'key', 'hash', shard_count=> 4); +ALTER TABLE mx_ddl_table2 SET ACCESS METHOD heap2; + +DROP TABLE mx_ddl_table2; +DROP ACCESS METHOD heap2; +SELECT run_command_on_workers($$DROP ACCESS METHOD heap2$$); + -- Clean up \set VERBOSITY terse SET client_min_messages TO ERROR; diff --git a/src/test/regress/sql/single_node_enterprise.sql b/src/test/regress/sql/single_node_enterprise.sql index 9ad590bb8..76615b852 100644 --- a/src/test/regress/sql/single_node_enterprise.sql +++ b/src/test/regress/sql/single_node_enterprise.sql @@ -313,8 +313,6 @@ DROP SCHEMA single_node_ent CASCADE; DROP OWNED BY full_access_single_node; DROP OWNED BY read_access_single_node; -SELECT run_command_on_workers($$DROP OWNED BY full_access_single_node$$); -SELECT run_command_on_workers($$DROP OWNED BY read_access_single_node$$); DROP ROLE full_access_single_node; DROP ROLE read_access_single_node;