Merge remote-tracking branch 'upstream/main' into issue/6440

test-6495
Gokhan Gulbiz 2022-12-03 12:16:19 +03:00
commit d1db09198c
No known key found for this signature in database
GPG Key ID: 608EF06B6BD1B45B
41 changed files with 1263 additions and 175 deletions

View File

@ -167,8 +167,9 @@ jobs:
name: 'Copy coredumps' name: 'Copy coredumps'
command: | command: |
mkdir -p /tmp/core_dumps mkdir -p /tmp/core_dumps
if ls core.* 1> /dev/null 2>&1; then core_files=( $(find . -type f -regex .*core.*\d*.*postgres) )
cp core.* /tmp/core_dumps if [ ${#core_files[@]} -gt 0 ]; then
cp "${core_files[@]}" /tmp/core_dumps
fi fi
when: on_fail when: on_fail
- run: - run:
@ -258,8 +259,9 @@ jobs:
name: 'Copy coredumps' name: 'Copy coredumps'
command: | command: |
mkdir -p /tmp/core_dumps mkdir -p /tmp/core_dumps
if ls core.* 1> /dev/null 2>&1; then core_files=( $(find . -type f -regex .*core.*\d*.*postgres) )
cp core.* /tmp/core_dumps if [ ${#core_files[@]} -gt 0 ]; then
cp "${core_files[@]}" /tmp/core_dumps
fi fi
when: on_fail when: on_fail
- store_artifacts: - store_artifacts:
@ -339,8 +341,9 @@ jobs:
name: 'Copy coredumps' name: 'Copy coredumps'
command: | command: |
mkdir -p /tmp/core_dumps mkdir -p /tmp/core_dumps
if ls core.* 1> /dev/null 2>&1; then core_files=( $(find . -type f -regex .*core.*\d*.*postgres) )
cp core.* /tmp/core_dumps if [ ${#core_files[@]} -gt 0 ]; then
cp "${core_files[@]}" /tmp/core_dumps
fi fi
when: on_fail when: on_fail
- store_artifacts: - store_artifacts:
@ -405,8 +408,9 @@ jobs:
name: 'Copy coredumps' name: 'Copy coredumps'
command: | command: |
mkdir -p /tmp/core_dumps mkdir -p /tmp/core_dumps
if ls core.* 1> /dev/null 2>&1; then core_files=( $(find . -type f -regex .*core.*\d*.*postgres) )
cp core.* /tmp/core_dumps if [ ${#core_files[@]} -gt 0 ]; then
cp "${core_files[@]}" /tmp/core_dumps
fi fi
when: on_fail when: on_fail
- store_artifacts: - store_artifacts:
@ -483,8 +487,9 @@ jobs:
name: 'Copy coredumps' name: 'Copy coredumps'
command: | command: |
mkdir -p /tmp/core_dumps mkdir -p /tmp/core_dumps
if ls core.* 1> /dev/null 2>&1; then core_files=( $(find . -type f -regex .*core.*\d*.*postgres) )
cp core.* /tmp/core_dumps if [ ${#core_files[@]} -gt 0 ]; then
cp "${core_files[@]}" /tmp/core_dumps
fi fi
when: on_fail when: on_fail
- store_artifacts: - store_artifacts:
@ -627,8 +632,9 @@ jobs:
name: 'Copy coredumps' name: 'Copy coredumps'
command: | command: |
mkdir -p /tmp/core_dumps mkdir -p /tmp/core_dumps
if ls core.* 1> /dev/null 2>&1; then core_files=( $(find . -type f -regex .*core.*\d*.*postgres) )
cp core.* /tmp/core_dumps if [ ${#core_files[@]} -gt 0 ]; then
cp "${core_files[@]}" /tmp/core_dumps
fi fi
when: on_fail when: on_fail
- store_artifacts: - store_artifacts:

View File

@ -0,0 +1,3 @@
base:
- ".* warning: ignoring old recipe for target [`']check'"
- ".* warning: overriding recipe for target [`']check'"

6
.github/packaging/validate_build_output.sh vendored Executable file
View File

@ -0,0 +1,6 @@
package_type=${1}
git clone -b v0.8.23 --depth=1 https://github.com/citusdata/tools.git tools
python3 -m pip install -r tools/packaging_automation/requirements.txt
python3 -m tools.packaging_automation.validate_build_output --output_file output.log \
--ignore_file .github/packaging/packaging_ignore.yml \
--package_type ${package_type}

View File

@ -0,0 +1,158 @@
name: Build tests in packaging images
on:
push:
branches: "**"
workflow_dispatch:
jobs:
get_postgres_versions_from_file:
runs-on: ubuntu-latest
outputs:
pg_versions: ${{ steps.get-postgres-versions.outputs.pg_versions }}
steps:
- name: Checkout
uses: actions/checkout@v3
with:
fetch-depth: 2
- name: Get Postgres Versions
id: get-postgres-versions
run: |
# Postgres versions are stored in .circleci/config.yml file in "build-[pg-version] format. Below command
# extracts the versions and get the unique values.
pg_versions=`grep -Eo 'build-[[:digit:]]{2}' .circleci/config.yml|sed -e "s/^build-//"|sort|uniq|tr '\n' ','| head -c -1`
pg_versions_array="[ ${pg_versions} ]"
echo "Supported PG Versions: ${pg_versions_array}"
# Below line is needed to set the output variable to be used in the next job
echo "pg_versions=${pg_versions_array}" >> $GITHUB_OUTPUT
rpm_build_tests:
name: rpm_build_tests
needs: get_postgres_versions_from_file
runs-on: ubuntu-latest
strategy:
fail-fast: false
matrix:
# While we use separate images for different Postgres versions in rpm
# based distros
# For this reason, we need to use a "matrix" to generate names of
# rpm images, e.g. citus/packaging:centos-7-pg12
packaging_docker_image:
- oraclelinux-7
- oraclelinux-8
- centos-7
- centos-8
- almalinux-9
POSTGRES_VERSION: ${{ fromJson(needs.get_postgres_versions_from_file.outputs.pg_versions) }}
container:
image: citus/packaging:${{ matrix.packaging_docker_image }}-pg${{ matrix.POSTGRES_VERSION }}
steps:
- name: Checkout repository
uses: actions/checkout@v3
- name: Add Postgres installation directory into PATH for rpm based distros
run: |
echo "/usr/pgsql-${{ matrix.POSTGRES_VERSION }}/bin" >> $GITHUB_PATH
- name: Configure
run: |
echo "Current Shell:$0"
echo "GCC Version: $(gcc --version)"
./configure 2>&1 | tee output.log
- name: Make clean
run: |
make clean
- name: Make
run: |
make CFLAGS="-Wno-missing-braces" -sj$(cat /proc/cpuinfo | grep "core id" | wc -l) 2>&1 | tee -a output.log
- name: Make install
run: |
make CFLAGS="-Wno-missing-braces" install 2>&1 | tee -a output.log
- name: Validate output
env:
POSTGRES_VERSION: ${{ matrix.POSTGRES_VERSION }}
PACKAGING_DOCKER_IMAGE: ${{ matrix.packaging_docker_image }}
run: |
echo "Postgres version: ${POSTGRES_VERSION}"
## Install required packages to execute packaging tools for rpm based distros
yum install python3-pip python3-devel postgresql-devel -y
python3 -m pip install wheel
./.github/packaging/validate_build_output.sh "rpm"
deb_build_tests:
name: deb_build_tests
needs: get_postgres_versions_from_file
runs-on: ubuntu-latest
strategy:
fail-fast: false
matrix:
# On deb based distros, we use the same docker image for
# builds based on different Postgres versions because deb
# based images include all postgres installations.
# For this reason, we have multiple runs --which is 3 today--
# for each deb based image and we use POSTGRES_VERSION to set
# PG_CONFIG variable in each of those runs.
packaging_docker_image:
- debian-buster-all
- debian-bookworm-all
- debian-bullseye-all
- ubuntu-bionic-all
- ubuntu-focal-all
- ubuntu-jammy-all
- ubuntu-kinetic-all
POSTGRES_VERSION: ${{ fromJson(needs.get_postgres_versions_from_file.outputs.pg_versions) }}
container:
image: citus/packaging:${{ matrix.packaging_docker_image }}
steps:
- name: Checkout repository
uses: actions/checkout@v3
- name: Set pg_config path to related Postgres version
run: |
echo "PG_CONFIG=/usr/lib/postgresql/${{ matrix.POSTGRES_VERSION }}/bin/pg_config" >> $GITHUB_ENV
- name: Configure
run: |
echo "Current Shell:$0"
echo "GCC Version: $(gcc --version)"
./configure 2>&1 | tee output.log
- name: Make clean
run: |
make clean
- name: Make
run: |
make -sj$(cat /proc/cpuinfo | grep "core id" | wc -l) 2>&1 | tee -a output.log
- name: Make install
run: |
make install 2>&1 | tee -a output.log
- name: Validate output
env:
POSTGRES_VERSION: ${{ matrix.POSTGRES_VERSION }}
PACKAGING_DOCKER_IMAGE: ${{ matrix.packaging_docker_image }}
run: |
echo "Postgres version: ${POSTGRES_VERSION}"
apt-get update -y
## Install required packages to execute packaging tools for deb based distros
apt install python3-dev python3-pip -y
sudo apt-get purge -y python3-yaml
python3 -m pip install --upgrade pip setuptools==57.5.0
./.github/packaging/validate_build_output.sh "deb"

View File

@ -234,16 +234,14 @@ CREATE TABLE perf_columnar(LIKE perf_row) USING COLUMNAR;
## Data ## Data
```sql ```sql
CREATE OR REPLACE FUNCTION random_words(n INT4) RETURNS TEXT LANGUAGE plpython2u AS $$ CREATE OR REPLACE FUNCTION random_words(n INT4) RETURNS TEXT LANGUAGE sql AS $$
import random WITH words(w) AS (
t = '' SELECT ARRAY['zero','one','two','three','four','five','six','seven','eight','nine','ten']
words = ['zero','one','two','three','four','five','six','seven','eight','nine','ten'] ),
for i in xrange(0,n): random (word) AS (
if (i != 0): SELECT w[(random()*array_length(w, 1))::int] FROM generate_series(1, $1) AS i, words
t += ' ' )
r = random.randint(0,len(words)-1) SELECT string_agg(word, ' ') FROM random;
t += words[r]
return t
$$; $$;
``` ```

View File

@ -254,6 +254,15 @@ static DistributeObjectOps Any_CreateRole = {
.address = CreateRoleStmtObjectAddress, .address = CreateRoleStmtObjectAddress,
.markDistributed = true, .markDistributed = true,
}; };
static DistributeObjectOps Any_DropOwned = {
.deparse = DeparseDropOwnedStmt,
.qualify = NULL,
.preprocess = PreprocessDropOwnedStmt,
.postprocess = NULL,
.operationType = DIST_OPS_DROP,
.address = NULL,
.markDistributed = false,
};
static DistributeObjectOps Any_DropRole = { static DistributeObjectOps Any_DropRole = {
.deparse = DeparseDropRoleStmt, .deparse = DeparseDropRoleStmt,
.qualify = NULL, .qualify = NULL,
@ -1658,6 +1667,11 @@ GetDistributeObjectOps(Node *node)
return &Any_DropRole; return &Any_DropRole;
} }
case T_DropOwnedStmt:
{
return &Any_DropOwned;
}
case T_DropStmt: case T_DropStmt:
{ {
DropStmt *stmt = castNode(DropStmt, node); DropStmt *stmt = castNode(DropStmt, node);

View File

@ -0,0 +1,90 @@
/*-------------------------------------------------------------------------
*
* owned.c
* Commands for DROP OWNED statements.
*
* Copyright (c) Citus Data, Inc.
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "access/heapam.h"
#include "access/htup_details.h"
#include "access/genam.h"
#include "access/table.h"
#include "access/xact.h"
#include "catalog/catalog.h"
#include "catalog/pg_auth_members.h"
#include "catalog/pg_authid.h"
#include "catalog/pg_db_role_setting.h"
#include "catalog/pg_type.h"
#include "catalog/objectaddress.h"
#include "commands/dbcommands.h"
#include "distributed/citus_ruleutils.h"
#include "distributed/citus_safe_lib.h"
#include "distributed/commands.h"
#include "distributed/commands/utility_hook.h"
#include "distributed/deparser.h"
#include "distributed/listutils.h"
#include "distributed/coordinator_protocol.h"
#include "distributed/metadata/distobject.h"
#include "distributed/metadata_sync.h"
#include "distributed/metadata/distobject.h"
#include "distributed/multi_executor.h"
#include "distributed/relation_access_tracking.h"
#include "distributed/version_compat.h"
#include "distributed/worker_transaction.h"
#include "miscadmin.h"
#include "nodes/makefuncs.h"
#include "nodes/parsenodes.h"
#include "nodes/pg_list.h"
#include "parser/scansup.h"
#include "utils/builtins.h"
#include "utils/fmgroids.h"
#include "utils/rel.h"
#include "utils/varlena.h"
#include "utils/syscache.h"
/*
* PreprocessDropOwnedStmt finds the distributed role out of the ones
* being dropped and unmarks them distributed and creates the drop statements
* for the workers.
*/
List *
PreprocessDropOwnedStmt(Node *node, const char *queryString,
ProcessUtilityContext processUtilityContext)
{
DropOwnedStmt *stmt = castNode(DropOwnedStmt, node);
List *allDropRoles = stmt->roles;
List *distributedDropRoles = FilterDistributedRoles(allDropRoles);
if (list_length(distributedDropRoles) <= 0)
{
return NIL;
}
if (!ShouldPropagate())
{
return NIL;
}
/* check creation against multi-statement transaction policy */
if (!ShouldPropagateCreateInCoordinatedTransction())
{
return NIL;
}
EnsureCoordinator();
stmt->roles = distributedDropRoles;
char *sql = DeparseTreeNode((Node *) stmt);
stmt->roles = allDropRoles;
List *commands = list_make3(DISABLE_DDL_PROPAGATION,
sql,
ENABLE_DDL_PROPAGATION);
return NodeDDLTaskList(NON_COORDINATOR_NODES, commands);
}

View File

@ -2992,6 +2992,9 @@ ErrorIfUnsupportedAlterTableStmt(AlterTableStmt *alterTableStatement)
break; break;
} }
#if PG_VERSION_NUM >= PG_VERSION_15
case AT_SetAccessMethod:
#endif
case AT_SetNotNull: case AT_SetNotNull:
case AT_ReplicaIdentity: case AT_ReplicaIdentity:
case AT_ChangeOwner: case AT_ChangeOwner:
@ -3007,6 +3010,7 @@ ErrorIfUnsupportedAlterTableStmt(AlterTableStmt *alterTableStatement)
{ {
/* /*
* We will not perform any special check for: * We will not perform any special check for:
* ALTER TABLE .. SET ACCESS METHOD ..
* ALTER TABLE .. ALTER COLUMN .. SET NOT NULL * ALTER TABLE .. ALTER COLUMN .. SET NOT NULL
* ALTER TABLE .. REPLICA IDENTITY .. * ALTER TABLE .. REPLICA IDENTITY ..
* ALTER TABLE .. VALIDATE CONSTRAINT .. * ALTER TABLE .. VALIDATE CONSTRAINT ..

View File

@ -0,0 +1,84 @@
/*-------------------------------------------------------------------------
*
* deparse_owned_stmts.c
* Functions to turn all Statement structures related to owned back
* into sql.
*
* Copyright (c), Citus Data, Inc.
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "pg_version_compat.h"
#include "distributed/citus_ruleutils.h"
#include "distributed/deparser.h"
#include "lib/stringinfo.h"
#include "nodes/parsenodes.h"
#include "utils/builtins.h"
static void AppendDropOwnedStmt(StringInfo buf, DropOwnedStmt *stmt);
static void AppendRoleList(StringInfo buf, List *roleList);
/*
* DeparseDropOwnedStmt builds and returns a string representing of the
* DropOwnedStmt for application on a remote server.
*/
char *
DeparseDropOwnedStmt(Node *node)
{
DropOwnedStmt *stmt = castNode(DropOwnedStmt, node);
StringInfoData buf = { 0 };
initStringInfo(&buf);
AppendDropOwnedStmt(&buf, stmt);
return buf.data;
}
/*
* AppendDropOwnedStmt generates the string representation of the
* DropOwnedStmt and appends it to the buffer.
*/
static void
AppendDropOwnedStmt(StringInfo buf, DropOwnedStmt *stmt)
{
appendStringInfo(buf, "DROP OWNED BY ");
AppendRoleList(buf, stmt->roles);
if (stmt->behavior == DROP_RESTRICT)
{
appendStringInfo(buf, " RESTRICT");
}
else if (stmt->behavior == DROP_CASCADE)
{
appendStringInfo(buf, " CASCADE");
}
}
static void
AppendRoleList(StringInfo buf, List *roleList)
{
ListCell *cell = NULL;
foreach(cell, roleList)
{
Node *roleNode = (Node *) lfirst(cell);
Assert(IsA(roleNode, RoleSpec) || IsA(roleNode, AccessPriv));
char const *rolename = NULL;
if (IsA(roleNode, RoleSpec))
{
rolename = RoleSpecString((RoleSpec *) roleNode, true);
}
appendStringInfoString(buf, rolename);
if (cell != list_tail(roleList))
{
appendStringInfo(buf, ", ");
}
}
}

View File

@ -2232,7 +2232,7 @@ SetWorkerColumnOptional(WorkerNode *workerNode, int columnIndex, Datum value)
{ {
/* metadata out of sync, mark the worker as not synced */ /* metadata out of sync, mark the worker as not synced */
ereport(WARNING, (errmsg("Updating the metadata of the node %s:%d " ereport(WARNING, (errmsg("Updating the metadata of the node %s:%d "
"is failed on node %s:%d." "is failed on node %s:%d. "
"Metadata on %s:%d is marked as out of sync.", "Metadata on %s:%d is marked as out of sync.",
workerNode->workerName, workerNode->workerPort, workerNode->workerName, workerNode->workerPort,
worker->workerName, worker->workerPort, worker->workerName, worker->workerPort,

View File

@ -553,7 +553,7 @@ CheckSpaceConstraints(MultiConnection *connection, uint64 colocationSizeInBytes)
{ {
ereport(ERROR, (errmsg("not enough empty space on node if the shard is moved, " ereport(ERROR, (errmsg("not enough empty space on node if the shard is moved, "
"actual available space after move will be %ld bytes, " "actual available space after move will be %ld bytes, "
"desired available space after move is %ld bytes," "desired available space after move is %ld bytes, "
"estimated size increase on node after move is %ld bytes.", "estimated size increase on node after move is %ld bytes.",
diskAvailableInBytesAfterShardMove, diskAvailableInBytesAfterShardMove,
desiredNewDiskAvailableInBytes, colocationSizeInBytes), desiredNewDiskAvailableInBytes, colocationSizeInBytes),

View File

@ -604,7 +604,7 @@ CreateReplicaIdentitiesOnNode(List *shardList, char *nodeName, int32 nodePort)
if (commandList != NIL) if (commandList != NIL)
{ {
ereport(DEBUG1, (errmsg("Creating replica identity for shard %ld on" ereport(DEBUG1, (errmsg("Creating replica identity for shard %ld on "
"target node %s:%d", shardId, nodeName, nodePort))); "target node %s:%d", shardId, nodeName, nodePort)));
SendCommandListToWorkerOutsideTransaction(nodeName, nodePort, SendCommandListToWorkerOutsideTransaction(nodeName, nodePort,

View File

@ -1054,8 +1054,8 @@ RegisterCitusConfigVariables(void)
gettext_noop( gettext_noop(
"Sets how many percentage of free disk space should be after a shard move"), "Sets how many percentage of free disk space should be after a shard move"),
gettext_noop( gettext_noop(
"This setting controls how much free space should be available after a shard move." "This setting controls how much free space should be available after a shard move. "
"If the free disk space will be lower than this parameter, then shard move will result in" "If the free disk space will be lower than this parameter, then shard move will result in "
"an error."), "an error."),
&DesiredPercentFreeAfterMove, &DesiredPercentFreeAfterMove,
10.0, 0.0, 100.0, 10.0, 0.0, 100.0,
@ -1448,7 +1448,7 @@ RegisterCitusConfigVariables(void)
"parallelization"), "parallelization"),
gettext_noop("When enabled, Citus will force the executor to use " gettext_noop("When enabled, Citus will force the executor to use "
"as many connections as possible while executing a " "as many connections as possible while executing a "
"parallel distributed query. If not enabled, the executor" "parallel distributed query. If not enabled, the executor "
"might choose to use less connections to optimize overall " "might choose to use less connections to optimize overall "
"query execution throughput. Internally, setting this true " "query execution throughput. Internally, setting this true "
"will end up with using one connection per task."), "will end up with using one connection per task."),
@ -1487,7 +1487,7 @@ RegisterCitusConfigVariables(void)
DefineCustomBoolVariable( DefineCustomBoolVariable(
"citus.hide_citus_dependent_objects", "citus.hide_citus_dependent_objects",
gettext_noop( gettext_noop(
"Hides some objects, which depends on citus extension, from pg meta class queries." "Hides some objects, which depends on citus extension, from pg meta class queries. "
"It is intended to be used only before postgres vanilla tests to not break them."), "It is intended to be used only before postgres vanilla tests to not break them."),
NULL, NULL,
&HideCitusDependentObjects, &HideCitusDependentObjects,
@ -1594,10 +1594,10 @@ RegisterCitusConfigVariables(void)
gettext_noop("defines the behaviour when a distributed table " gettext_noop("defines the behaviour when a distributed table "
"is joined with a local table"), "is joined with a local table"),
gettext_noop( gettext_noop(
"There are 4 values available. The default, 'auto' will recursively plan" "There are 4 values available. The default, 'auto' will recursively plan "
"distributed tables if there is a constant filter on a unique index." "distributed tables if there is a constant filter on a unique index. "
"'prefer-local' will choose local tables if possible." "'prefer-local' will choose local tables if possible. "
"'prefer-distributed' will choose distributed tables if possible" "'prefer-distributed' will choose distributed tables if possible. "
"'never' will basically skip local table joins." "'never' will basically skip local table joins."
), ),
&LocalTableJoinPolicy, &LocalTableJoinPolicy,
@ -1816,11 +1816,11 @@ RegisterCitusConfigVariables(void)
"health status are tracked in a shared hash table on " "health status are tracked in a shared hash table on "
"the master node. This configuration value limits the " "the master node. This configuration value limits the "
"size of the hash table, and consequently the maximum " "size of the hash table, and consequently the maximum "
"number of worker nodes that can be tracked." "number of worker nodes that can be tracked. "
"Citus keeps some information about the worker nodes " "Citus keeps some information about the worker nodes "
"in the shared memory for certain optimizations. The " "in the shared memory for certain optimizations. The "
"optimizations are enforced up to this number of worker " "optimizations are enforced up to this number of worker "
"nodes. Any additional worker nodes may not benefit from" "nodes. Any additional worker nodes may not benefit from "
"the optimizations."), "the optimizations."),
&MaxWorkerNodesTracked, &MaxWorkerNodesTracked,
2048, 1024, INT_MAX, 2048, 1024, INT_MAX,
@ -1994,7 +1994,7 @@ RegisterCitusConfigVariables(void)
gettext_noop("When enabled, the executor waits until all the connections " gettext_noop("When enabled, the executor waits until all the connections "
"are successfully established."), "are successfully established."),
gettext_noop("Under some load, the executor may decide to establish some " gettext_noop("Under some load, the executor may decide to establish some "
"extra connections to further parallelize the execution. However," "extra connections to further parallelize the execution. However, "
"before the connection establishment is done, the execution might " "before the connection establishment is done, the execution might "
"have already finished. When this GUC is set to true, the execution " "have already finished. When this GUC is set to true, the execution "
"waits for such connections to be established."), "waits for such connections to be established."),
@ -2092,7 +2092,7 @@ RegisterCitusConfigVariables(void)
"citus.replication_model", "citus.replication_model",
gettext_noop("Deprecated. Please use citus.shard_replication_factor instead"), gettext_noop("Deprecated. Please use citus.shard_replication_factor instead"),
gettext_noop( gettext_noop(
"Shard replication model is determined by the shard replication factor." "Shard replication model is determined by the shard replication factor. "
"'statement' replication is used only when the replication factor is one."), "'statement' replication is used only when the replication factor is one."),
&ReplicationModel, &ReplicationModel,
REPLICATION_MODEL_STREAMING, REPLICATION_MODEL_STREAMING,
@ -2178,7 +2178,7 @@ RegisterCitusConfigVariables(void)
"citus.skip_advisory_lock_permission_checks", "citus.skip_advisory_lock_permission_checks",
gettext_noop("Postgres would normally enforce some " gettext_noop("Postgres would normally enforce some "
"ownership checks while acquiring locks. " "ownership checks while acquiring locks. "
"When this setting is 'off', Citus skips" "When this setting is 'on', Citus skips "
"ownership checks on internal advisory " "ownership checks on internal advisory "
"locks."), "locks."),
NULL, NULL,
@ -2225,7 +2225,7 @@ RegisterCitusConfigVariables(void)
gettext_noop("This feature is not intended for users. It is developed " gettext_noop("This feature is not intended for users. It is developed "
"to get consistent regression test outputs. When enabled, " "to get consistent regression test outputs. When enabled, "
"the RETURNING clause returns the tuples sorted. The sort " "the RETURNING clause returns the tuples sorted. The sort "
"is done for all the entries, starting from the first one." "is done for all the entries, starting from the first one. "
"Finally, the sorting is done in ASC order."), "Finally, the sorting is done in ASC order."),
&SortReturning, &SortReturning,
false, false,
@ -2283,7 +2283,7 @@ RegisterCitusConfigVariables(void)
"It means that the queries are likely to return wrong results " "It means that the queries are likely to return wrong results "
"unless the user is absolutely sure that pushing down the " "unless the user is absolutely sure that pushing down the "
"subquery is safe. This GUC is maintained only for backward " "subquery is safe. This GUC is maintained only for backward "
"compatibility, no new users are supposed to use it. The planner" "compatibility, no new users are supposed to use it. The planner "
"is capable of pushing down as much computation as possible to the " "is capable of pushing down as much computation as possible to the "
"shards depending on the query."), "shards depending on the query."),
&SubqueryPushdown, &SubqueryPushdown,

View File

@ -72,12 +72,14 @@
#define CITUS_BACKGROUND_TASK_KEY_COMMAND 2 #define CITUS_BACKGROUND_TASK_KEY_COMMAND 2
#define CITUS_BACKGROUND_TASK_KEY_QUEUE 3 #define CITUS_BACKGROUND_TASK_KEY_QUEUE 3
#define CITUS_BACKGROUND_TASK_KEY_TASK_ID 4 #define CITUS_BACKGROUND_TASK_KEY_TASK_ID 4
#define CITUS_BACKGROUND_TASK_NKEYS 5 #define CITUS_BACKGROUND_TASK_KEY_JOB_ID 5
#define CITUS_BACKGROUND_TASK_NKEYS 6
static BackgroundWorkerHandle * StartCitusBackgroundTaskExecutor(char *database, static BackgroundWorkerHandle * StartCitusBackgroundTaskExecutor(char *database,
char *user, char *user,
char *command, char *command,
int64 taskId, int64 taskId,
int64 jobId,
dsm_segment **pSegment); dsm_segment **pSegment);
static void ExecuteSqlString(const char *sql); static void ExecuteSqlString(const char *sql);
static shm_mq_result ConsumeTaskWorkerOutput(shm_mq_handle *responseq, StringInfo message, static shm_mq_result ConsumeTaskWorkerOutput(shm_mq_handle *responseq, StringInfo message,
@ -106,6 +108,18 @@ static TaskExecutionStatus ConsumeExecutorQueue(
TaskExecutionContext *taskExecutionContext); TaskExecutionContext *taskExecutionContext);
static void TaskHadError(TaskExecutionContext *taskExecutionContext); static void TaskHadError(TaskExecutionContext *taskExecutionContext);
static void TaskEnded(TaskExecutionContext *taskExecutionContext); static void TaskEnded(TaskExecutionContext *taskExecutionContext);
static void TerminateAllTaskExecutors(HTAB *currentExecutors);
static HTAB * GetRunningUniqueJobIds(HTAB *currentExecutors);
static void CancelAllTaskExecutors(HTAB *currentExecutors);
static bool MonitorGotTerminationOrCancellationRequest();
static void QueueMonitorSigTermHandler(SIGNAL_ARGS);
static void QueueMonitorSigIntHandler(SIGNAL_ARGS);
static void QueueMonitorSigHupHandler(SIGNAL_ARGS);
/* flags set by signal handlers */
static volatile sig_atomic_t GotSigterm = false;
static volatile sig_atomic_t GotSigint = false;
static volatile sig_atomic_t GotSighup = false;
PG_FUNCTION_INFO_V1(citus_job_cancel); PG_FUNCTION_INFO_V1(citus_job_cancel);
PG_FUNCTION_INFO_V1(citus_job_wait); PG_FUNCTION_INFO_V1(citus_job_wait);
@ -337,8 +351,6 @@ CitusBackgroundTaskQueueMonitorErrorCallback(void *arg)
/* /*
* NewExecutorExceedsCitusLimit returns true if currently we reached Citus' max worker count. * NewExecutorExceedsCitusLimit returns true if currently we reached Citus' max worker count.
* It also sleeps 1 sec to let running tasks progress so that we have better chance to not hit
* that limit again.
*/ */
static bool static bool
NewExecutorExceedsCitusLimit(QueueMonitorExecutionContext *queueMonitorExecutionContext) NewExecutorExceedsCitusLimit(QueueMonitorExecutionContext *queueMonitorExecutionContext)
@ -372,8 +384,6 @@ NewExecutorExceedsCitusLimit(QueueMonitorExecutionContext *queueMonitorExecution
/* /*
* NewExecutorExceedsPgMaxWorkers returns true if currently we reached Postgres' max worker count. * NewExecutorExceedsPgMaxWorkers returns true if currently we reached Postgres' max worker count.
* It also sleeps 1 sec to let running tasks progress so that we have better chance to not hit
* that limit again.
*/ */
static bool static bool
NewExecutorExceedsPgMaxWorkers(BackgroundWorkerHandle *handle, NewExecutorExceedsPgMaxWorkers(BackgroundWorkerHandle *handle,
@ -389,8 +399,7 @@ NewExecutorExceedsPgMaxWorkers(BackgroundWorkerHandle *handle,
*/ */
if (queueMonitorExecutionContext->backgroundWorkerFailedStartTime == 0) if (queueMonitorExecutionContext->backgroundWorkerFailedStartTime == 0)
{ {
ereport(WARNING, (errmsg( ereport(WARNING, (errmsg("unable to start background worker for "
"unable to start background worker for "
"background task execution"), "background task execution"),
errdetail( errdetail(
"Current number of task " "Current number of task "
@ -432,7 +441,7 @@ AssignRunnableTaskToNewExecutor(BackgroundTask *runnableTask,
dsm_segment *seg = NULL; dsm_segment *seg = NULL;
BackgroundWorkerHandle *handle = BackgroundWorkerHandle *handle =
StartCitusBackgroundTaskExecutor(databaseName, userName, runnableTask->command, StartCitusBackgroundTaskExecutor(databaseName, userName, runnableTask->command,
runnableTask->taskid, &seg); runnableTask->taskid, runnableTask->jobid, &seg);
MemoryContextSwitchTo(oldContext); MemoryContextSwitchTo(oldContext);
if (NewExecutorExceedsPgMaxWorkers(handle, queueMonitorExecutionContext)) if (NewExecutorExceedsPgMaxWorkers(handle, queueMonitorExecutionContext))
@ -450,6 +459,7 @@ AssignRunnableTaskToNewExecutor(BackgroundTask *runnableTask,
Assert(!handleEntryFound); Assert(!handleEntryFound);
handleEntry->handle = handle; handleEntry->handle = handle;
handleEntry->seg = seg; handleEntry->seg = seg;
handleEntry->jobid = runnableTask->jobid;
/* reset worker allocation timestamp and log time elapsed since the last failure */ /* reset worker allocation timestamp and log time elapsed since the last failure */
CheckAndResetLastWorkerAllocationFailure(queueMonitorExecutionContext); CheckAndResetLastWorkerAllocationFailure(queueMonitorExecutionContext);
@ -543,7 +553,7 @@ CheckAndResetLastWorkerAllocationFailure(
GetCurrentTimestamp(), GetCurrentTimestamp(),
&secs, &microsecs); &secs, &microsecs);
ereport(LOG, (errmsg( ereport(LOG, (errmsg(
"able to start a background worker with %ld seconds" "able to start a background worker with %ld seconds "
"delay", secs))); "delay", secs)));
queueMonitorExecutionContext->backgroundWorkerFailedStartTime = 0; queueMonitorExecutionContext->backgroundWorkerFailedStartTime = 0;
@ -741,6 +751,138 @@ TaskEnded(TaskExecutionContext *taskExecutionContext)
} }
/*
* QueueMonitorSigHupHandler handles SIGHUP to update monitor related config params.
*/
static void
QueueMonitorSigHupHandler(SIGNAL_ARGS)
{
int saved_errno = errno;
GotSighup = true;
if (MyProc)
{
SetLatch(&MyProc->procLatch);
}
errno = saved_errno;
}
/*
* MonitorGotTerminationOrCancellationRequest returns true if monitor had SIGTERM or SIGINT signals
*/
static bool
MonitorGotTerminationOrCancellationRequest()
{
return GotSigterm || GotSigint;
}
/*
* QueueMonitorSigTermHandler handles SIGTERM by setting a flag to inform the monitor process
* so that it can terminate active task executors properly. It also sets the latch to awake the
* monitor if it waits on it.
*/
static void
QueueMonitorSigTermHandler(SIGNAL_ARGS)
{
int saved_errno = errno;
GotSigterm = true;
if (MyProc)
{
SetLatch(&MyProc->procLatch);
}
errno = saved_errno;
}
/*
* QueueMonitorSigIntHandler handles SIGINT by setting a flag to inform the monitor process
* so that it can terminate active task executors properly. It also sets the latch to awake the
* monitor if it waits on it.
*/
static void
QueueMonitorSigIntHandler(SIGNAL_ARGS)
{
int saved_errno = errno;
GotSigint = true;
if (MyProc)
{
SetLatch(&MyProc->procLatch);
}
errno = saved_errno;
}
/*
* TerminateAllTaskExecutors terminates task executors given in the hash map.
*/
static void
TerminateAllTaskExecutors(HTAB *currentExecutors)
{
HASH_SEQ_STATUS status;
BackgroundExecutorHashEntry *backgroundExecutorHashEntry;
foreach_htab(backgroundExecutorHashEntry, &status, currentExecutors)
{
TerminateBackgroundWorker(backgroundExecutorHashEntry->handle);
}
}
/*
* GetRunningUniqueJobIds returns unique job ids from currentExecutors
*/
static HTAB *
GetRunningUniqueJobIds(HTAB *currentExecutors)
{
/* create a set to store unique job ids for currently executing tasks */
HTAB *uniqueJobIds = CreateSimpleHashSetWithSize(int64, MAX_BG_TASK_EXECUTORS);
HASH_SEQ_STATUS status;
BackgroundExecutorHashEntry *backgroundExecutorHashEntry;
foreach_htab(backgroundExecutorHashEntry, &status, currentExecutors)
{
hash_search(uniqueJobIds, &backgroundExecutorHashEntry->jobid, HASH_ENTER, NULL);
}
return uniqueJobIds;
}
/*
* CancelAllTaskExecutors cancels task executors given in the hash map.
*/
static void
CancelAllTaskExecutors(HTAB *currentExecutors)
{
StartTransactionCommand();
PushActiveSnapshot(GetTransactionSnapshot());
/* get unique job id set for running tasks in currentExecutors */
HTAB *uniqueJobIds = GetRunningUniqueJobIds(currentExecutors);
HASH_SEQ_STATUS status;
int64 *uniqueJobId;
foreach_htab(uniqueJobId, &status, uniqueJobIds)
{
ereport(DEBUG1, (errmsg("cancelling job: %ld", *uniqueJobId)));
Datum jobidDatum = Int64GetDatum(*uniqueJobId);
DirectFunctionCall1(citus_job_cancel, jobidDatum);
}
PopActiveSnapshot();
CommitTransactionCommand();
}
/* /*
* CitusBackgroundTaskQueueMonitorMain is the main entry point for the background worker * CitusBackgroundTaskQueueMonitorMain is the main entry point for the background worker
* running the background tasks queue monitor. * running the background tasks queue monitor.
@ -758,6 +900,18 @@ TaskEnded(TaskExecutionContext *taskExecutionContext)
void void
CitusBackgroundTaskQueueMonitorMain(Datum arg) CitusBackgroundTaskQueueMonitorMain(Datum arg)
{ {
/* handle SIGTERM to properly terminate active task executors */
pqsignal(SIGTERM, QueueMonitorSigTermHandler);
/* handle SIGINT to properly cancel active task executors */
pqsignal(SIGINT, QueueMonitorSigIntHandler);
/* handle SIGHUP to update MaxBackgroundTaskExecutors */
pqsignal(SIGHUP, QueueMonitorSigHupHandler);
/* ready to handle signals */
BackgroundWorkerUnblockSignals();
Oid databaseOid = DatumGetObjectId(arg); Oid databaseOid = DatumGetObjectId(arg);
/* extension owner is passed via bgw_extra */ /* extension owner is passed via bgw_extra */
@ -765,8 +919,6 @@ CitusBackgroundTaskQueueMonitorMain(Datum arg)
memcpy_s(&extensionOwner, sizeof(extensionOwner), memcpy_s(&extensionOwner, sizeof(extensionOwner),
MyBgworkerEntry->bgw_extra, sizeof(Oid)); MyBgworkerEntry->bgw_extra, sizeof(Oid));
BackgroundWorkerUnblockSignals();
/* connect to database, after that we can actually access catalogs */ /* connect to database, after that we can actually access catalogs */
BackgroundWorkerInitializeConnectionByOid(databaseOid, extensionOwner, 0); BackgroundWorkerInitializeConnectionByOid(databaseOid, extensionOwner, 0);
@ -807,10 +959,6 @@ CitusBackgroundTaskQueueMonitorMain(Datum arg)
* cause conflicts on processing the tasks in the catalog table as well as violate * cause conflicts on processing the tasks in the catalog table as well as violate
* parallelism guarantees. To make sure there is at most, exactly one backend running * parallelism guarantees. To make sure there is at most, exactly one backend running
* we take a session lock on the CITUS_BACKGROUND_TASK_MONITOR operation. * we take a session lock on the CITUS_BACKGROUND_TASK_MONITOR operation.
*
* TODO now that we have a lock, we should install a term handler to terminate any
* 'child' backend when we are terminated. Otherwise we will still have a situation
* where the actual task could be running multiple times.
*/ */
LOCKTAG tag = { 0 }; LOCKTAG tag = { 0 };
SET_LOCKTAG_CITUS_OPERATION(tag, CITUS_BACKGROUND_TASK_MONITOR); SET_LOCKTAG_CITUS_OPERATION(tag, CITUS_BACKGROUND_TASK_MONITOR);
@ -841,11 +989,13 @@ CitusBackgroundTaskQueueMonitorMain(Datum arg)
PopActiveSnapshot(); PopActiveSnapshot();
CommitTransactionCommand(); CommitTransactionCommand();
/* create a map to store parallel task executors */ /* create a map to store parallel task executors. Persist it in monitor memory context */
oldContext = MemoryContextSwitchTo(backgroundTaskContext);
HTAB *currentExecutors = CreateSimpleHashWithNameAndSize(int64, HTAB *currentExecutors = CreateSimpleHashWithNameAndSize(int64,
BackgroundExecutorHashEntry, BackgroundExecutorHashEntry,
"Background Executor Hash", "Background Executor Hash",
MAX_BG_TASK_EXECUTORS); MAX_BG_TASK_EXECUTORS);
MemoryContextSwitchTo(oldContext);
/* /*
* monitor execution context that is useful during the monitor loop. * monitor execution context that is useful during the monitor loop.
@ -861,6 +1011,10 @@ CitusBackgroundTaskQueueMonitorMain(Datum arg)
.ctx = backgroundTaskContext .ctx = backgroundTaskContext
}; };
/* flag to prevent duplicate termination and cancellation of task executors */
bool terminateExecutorsStarted = false;
bool cancelExecutorsStarted = false;
/* loop exits if there is no running or runnable tasks left */ /* loop exits if there is no running or runnable tasks left */
bool hasAnyTask = true; bool hasAnyTask = true;
while (hasAnyTask) while (hasAnyTask)
@ -868,15 +1022,47 @@ CitusBackgroundTaskQueueMonitorMain(Datum arg)
/* handle signals */ /* handle signals */
CHECK_FOR_INTERRUPTS(); CHECK_FOR_INTERRUPTS();
/*
* if the flag is set, we should terminate all task executor workers to prevent duplicate
* runs of the same task on the next start of the monitor, which is dangerous for non-idempotent
* tasks. We do not break the loop here as we want to reflect tasks' messages. Hence, we wait until
* all tasks finish and also do not allow new runnable tasks to start running. After all current tasks
* finish, we can exit the loop safely.
*/
if (GotSigterm && !terminateExecutorsStarted)
{
ereport(LOG, (errmsg("handling termination signal")));
terminateExecutorsStarted = true;
TerminateAllTaskExecutors(queueMonitorExecutionContext.currentExecutors);
}
if (GotSigint && !cancelExecutorsStarted)
{
ereport(LOG, (errmsg("handling cancellation signal")));
cancelExecutorsStarted = true;
CancelAllTaskExecutors(queueMonitorExecutionContext.currentExecutors);
}
if (GotSighup)
{
GotSighup = false;
/* update max_background_task_executors if changed */
ProcessConfigFile(PGC_SIGHUP);
}
/* invalidate cache for new data in catalog */ /* invalidate cache for new data in catalog */
InvalidateMetadataSystemCache(); InvalidateMetadataSystemCache();
/* assign runnable tasks, if any, to new task executors in a transaction */ /* assign runnable tasks, if any, to new task executors in a transaction if we do not have SIGTERM or SIGINT */
if (!MonitorGotTerminationOrCancellationRequest())
{
StartTransactionCommand(); StartTransactionCommand();
PushActiveSnapshot(GetTransactionSnapshot()); PushActiveSnapshot(GetTransactionSnapshot());
AssignRunnableTasks(&queueMonitorExecutionContext); AssignRunnableTasks(&queueMonitorExecutionContext);
PopActiveSnapshot(); PopActiveSnapshot();
CommitTransactionCommand(); CommitTransactionCommand();
}
/* get running task entries from hash table */ /* get running task entries from hash table */
List *runningTaskEntries = GetRunningTaskEntries( List *runningTaskEntries = GetRunningTaskEntries(
@ -1268,7 +1454,8 @@ ConsumeTaskWorkerOutput(shm_mq_handle *responseq, StringInfo message, bool *hadE
* environment to the executor. * environment to the executor.
*/ */
static dsm_segment * static dsm_segment *
StoreArgumentsInDSM(char *database, char *username, char *command, int64 taskId) StoreArgumentsInDSM(char *database, char *username, char *command,
int64 taskId, int64 jobId)
{ {
/* /*
* Create the shared memory that we will pass to the background * Create the shared memory that we will pass to the background
@ -1284,6 +1471,7 @@ StoreArgumentsInDSM(char *database, char *username, char *command, int64 taskId)
#define QUEUE_SIZE ((Size) 65536) #define QUEUE_SIZE ((Size) 65536)
shm_toc_estimate_chunk(&e, QUEUE_SIZE); shm_toc_estimate_chunk(&e, QUEUE_SIZE);
shm_toc_estimate_chunk(&e, sizeof(int64)); shm_toc_estimate_chunk(&e, sizeof(int64));
shm_toc_estimate_chunk(&e, sizeof(int64));
shm_toc_estimate_keys(&e, CITUS_BACKGROUND_TASK_NKEYS); shm_toc_estimate_keys(&e, CITUS_BACKGROUND_TASK_NKEYS);
Size segsize = shm_toc_estimate(&e); Size segsize = shm_toc_estimate(&e);
@ -1330,6 +1518,10 @@ StoreArgumentsInDSM(char *database, char *username, char *command, int64 taskId)
*taskIdTarget = taskId; *taskIdTarget = taskId;
shm_toc_insert(toc, CITUS_BACKGROUND_TASK_KEY_TASK_ID, taskIdTarget); shm_toc_insert(toc, CITUS_BACKGROUND_TASK_KEY_TASK_ID, taskIdTarget);
int64 *jobIdTarget = shm_toc_allocate(toc, sizeof(int64));
*jobIdTarget = jobId;
shm_toc_insert(toc, CITUS_BACKGROUND_TASK_KEY_JOB_ID, jobIdTarget);
shm_mq_attach(mq, seg, NULL); shm_mq_attach(mq, seg, NULL);
return seg; return seg;
@ -1343,17 +1535,17 @@ StoreArgumentsInDSM(char *database, char *username, char *command, int64 taskId)
* pointer to the dynamic shared memory. * pointer to the dynamic shared memory.
*/ */
static BackgroundWorkerHandle * static BackgroundWorkerHandle *
StartCitusBackgroundTaskExecutor(char *database, char *user, char *command, int64 taskId, StartCitusBackgroundTaskExecutor(char *database, char *user, char *command,
dsm_segment **pSegment) int64 taskId, int64 jobId, dsm_segment **pSegment)
{ {
dsm_segment *seg = StoreArgumentsInDSM(database, user, command, taskId); dsm_segment *seg = StoreArgumentsInDSM(database, user, command, taskId, jobId);
/* Configure a worker. */ /* Configure a worker. */
BackgroundWorker worker = { 0 }; BackgroundWorker worker = { 0 };
memset(&worker, 0, sizeof(worker)); memset(&worker, 0, sizeof(worker));
SafeSnprintf(worker.bgw_name, BGW_MAXLEN, SafeSnprintf(worker.bgw_name, BGW_MAXLEN,
"Citus Background Task Queue Executor: %s/%s", "Citus Background Task Queue Executor: %s/%s for (%ld/%ld)",
database, user); database, user, jobId, taskId);
worker.bgw_flags = BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION; worker.bgw_flags = BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION;
worker.bgw_start_time = BgWorkerStart_ConsistentState; worker.bgw_start_time = BgWorkerStart_ConsistentState;
@ -1391,6 +1583,8 @@ typedef struct CitusBackgroundJobExecutorErrorCallbackContext
{ {
const char *database; const char *database;
const char *username; const char *username;
int64 taskId;
int64 jobId;
} CitusBackgroundJobExecutorErrorCallbackContext; } CitusBackgroundJobExecutorErrorCallbackContext;
@ -1403,8 +1597,9 @@ CitusBackgroundJobExecutorErrorCallback(void *arg)
{ {
CitusBackgroundJobExecutorErrorCallbackContext *context = CitusBackgroundJobExecutorErrorCallbackContext *context =
(CitusBackgroundJobExecutorErrorCallbackContext *) arg; (CitusBackgroundJobExecutorErrorCallbackContext *) arg;
errcontext("Citus Background Task Queue Executor: %s/%s", context->database, errcontext("Citus Background Task Queue Executor: %s/%s for (%ld/%ld)",
context->username); context->database, context->username,
context->jobId, context->taskId);
} }
@ -1418,10 +1613,6 @@ CitusBackgroundJobExecutorErrorCallback(void *arg)
void void
CitusBackgroundTaskExecutor(Datum main_arg) CitusBackgroundTaskExecutor(Datum main_arg)
{ {
/*
* TODO figure out if we need this signal handler that is in pgcron
* pqsignal(SIGTERM, pg_cron_background_worker_sigterm);
*/
BackgroundWorkerUnblockSignals(); BackgroundWorkerUnblockSignals();
/* Set up a dynamic shared memory segment. */ /* Set up a dynamic shared memory segment. */
@ -1445,6 +1636,7 @@ CitusBackgroundTaskExecutor(Datum main_arg)
char *username = shm_toc_lookup(toc, CITUS_BACKGROUND_TASK_KEY_USERNAME, false); char *username = shm_toc_lookup(toc, CITUS_BACKGROUND_TASK_KEY_USERNAME, false);
char *command = shm_toc_lookup(toc, CITUS_BACKGROUND_TASK_KEY_COMMAND, false); char *command = shm_toc_lookup(toc, CITUS_BACKGROUND_TASK_KEY_COMMAND, false);
int64 *taskId = shm_toc_lookup(toc, CITUS_BACKGROUND_TASK_KEY_TASK_ID, false); int64 *taskId = shm_toc_lookup(toc, CITUS_BACKGROUND_TASK_KEY_TASK_ID, false);
int64 *jobId = shm_toc_lookup(toc, CITUS_BACKGROUND_TASK_KEY_JOB_ID, false);
shm_mq *mq = shm_toc_lookup(toc, CITUS_BACKGROUND_TASK_KEY_QUEUE, false); shm_mq *mq = shm_toc_lookup(toc, CITUS_BACKGROUND_TASK_KEY_QUEUE, false);
shm_mq_set_sender(mq, MyProc); shm_mq_set_sender(mq, MyProc);
@ -1456,6 +1648,8 @@ CitusBackgroundTaskExecutor(Datum main_arg)
CitusBackgroundJobExecutorErrorCallbackContext context = { CitusBackgroundJobExecutorErrorCallbackContext context = {
.database = database, .database = database,
.username = username, .username = username,
.taskId = *taskId,
.jobId = *jobId,
}; };
errorCallback.callback = CitusBackgroundJobExecutorErrorCallback; errorCallback.callback = CitusBackgroundJobExecutorErrorCallback;
errorCallback.arg = (void *) &context; errorCallback.arg = (void *) &context;
@ -1482,8 +1676,8 @@ CitusBackgroundTaskExecutor(Datum main_arg)
/* Prepare to execute the query. */ /* Prepare to execute the query. */
SetCurrentStatementStartTimestamp(); SetCurrentStatementStartTimestamp();
debug_query_string = command; debug_query_string = command;
char *appname = psprintf("citus background task queue executor (taskId %ld)", char *appname = psprintf("citus background task queue executor (%ld/%ld)",
*taskId); *jobId, *taskId);
pgstat_report_appname(appname); pgstat_report_appname(appname);
pgstat_report_activity(STATE_RUNNING, command); pgstat_report_activity(STATE_RUNNING, command);
StartTransactionCommand(); StartTransactionCommand();

View File

@ -10,7 +10,11 @@ PG_LDFLAGS += $(LDFLAGS)
include $(citus_top_builddir)/Makefile.global include $(citus_top_builddir)/Makefile.global
# We reuse all the Citus flags (incl. security flags), but we are building a program not a shared library # We reuse all the Citus flags (incl. security flags), but we are building a program not a shared library
override CFLAGS := $(filter-out -shared,$(CFLAGS)) # We sometimes build Citus with a newer version of gcc than Postgres was built
# with and this breaks LTO (link-time optimization). Even if disabling it can
# have some perf impact this is ok because pg_send_cancellation is only used
# for tests anyway.
override CFLAGS := $(filter-out -shared, $(CFLAGS)) -fno-lto
# Filter out unneeded dependencies # Filter out unneeded dependencies
override LIBS := $(filter-out -lz -lreadline -ledit -ltermcap -lncurses -lcurses -lpam, $(LIBS)) override LIBS := $(filter-out -lz -lreadline -ledit -ltermcap -lncurses -lcurses -lpam, $(LIBS))

View File

@ -28,6 +28,7 @@ typedef struct BackgroundExecutorHashEntry
BackgroundWorkerHandle *handle; BackgroundWorkerHandle *handle;
dsm_segment *seg; dsm_segment *seg;
int64 jobid;
StringInfo message; StringInfo message;
} BackgroundExecutorHashEntry; } BackgroundExecutorHashEntry;

View File

@ -385,6 +385,9 @@ extern bool IsReindexWithParam_compat(ReindexStmt *stmt, char *paramName);
extern List * CreateExtensionStmtObjectAddress(Node *stmt, bool missing_ok, bool extern List * CreateExtensionStmtObjectAddress(Node *stmt, bool missing_ok, bool
isPostprocess); isPostprocess);
/* owned.c - forward declarations */
extern List * PreprocessDropOwnedStmt(Node *node, const char *queryString,
ProcessUtilityContext processUtilityContext);
/* policy.c - forward declarations */ /* policy.c - forward declarations */
extern List * CreatePolicyCommands(Oid relationId); extern List * CreatePolicyCommands(Oid relationId);

View File

@ -196,6 +196,9 @@ extern char * DeparseCreateRoleStmt(Node *stmt);
extern char * DeparseDropRoleStmt(Node *stmt); extern char * DeparseDropRoleStmt(Node *stmt);
extern char * DeparseGrantRoleStmt(Node *stmt); extern char * DeparseGrantRoleStmt(Node *stmt);
/* forward declarations for deparse_owned_stmts.c */
extern char * DeparseDropOwnedStmt(Node *node);
/* forward declarations for deparse_extension_stmts.c */ /* forward declarations for deparse_extension_stmts.c */
extern DefElem * GetExtensionOption(List *extensionOptions, extern DefElem * GetExtensionOption(List *extensionOptions,
const char *defname); const char *defname);

View File

@ -125,6 +125,12 @@ SELECT citus_job_cancel(:job_id);
(1 row) (1 row)
SELECT citus_job_wait(:job_id);
citus_job_wait
---------------------------------------------------------------------
(1 row)
SELECT state, NOT(started_at IS NULL) AS did_start FROM pg_dist_background_job WHERE job_id = :job_id; SELECT state, NOT(started_at IS NULL) AS did_start FROM pg_dist_background_job WHERE job_id = :job_id;
state | did_start state | did_start
--------------------------------------------------------------------- ---------------------------------------------------------------------
@ -242,6 +248,12 @@ SELECT citus_job_cancel(:job_id2); -- when a job with 1 task is cancelled, the l
(1 row) (1 row)
SELECT citus_job_wait(:job_id2); -- wait for the job to be cancelled
citus_job_wait
---------------------------------------------------------------------
(1 row)
SELECT citus_job_wait(:job_id3, desired_status => 'running'); SELECT citus_job_wait(:job_id3, desired_status => 'running');
citus_job_wait citus_job_wait
--------------------------------------------------------------------- ---------------------------------------------------------------------
@ -278,12 +290,6 @@ SELECT citus_job_wait(:job_id1);
(1 row) (1 row)
SELECT citus_job_wait(:job_id2);
citus_job_wait
---------------------------------------------------------------------
(1 row)
SELECT citus_job_wait(:job_id3); SELECT citus_job_wait(:job_id3);
citus_job_wait citus_job_wait
--------------------------------------------------------------------- ---------------------------------------------------------------------
@ -302,11 +308,262 @@ SELECT job_id, task_id, status FROM pg_dist_background_task
9 | 15 | cancelled 9 | 15 | cancelled
(5 rows) (5 rows)
-- verify that task is not starved by currently long running task -- verify that a task, previously not started due to lack of workers, is executed after we increase max worker count
BEGIN; BEGIN;
INSERT INTO pg_dist_background_job (job_type, description) VALUES ('test_job', 'simple test to verify max parallel background execution') RETURNING job_id AS job_id1 \gset INSERT INTO pg_dist_background_job (job_type, description) VALUES ('test_job', 'simple test to verify max parallel background execution') RETURNING job_id AS job_id1 \gset
INSERT INTO pg_dist_background_task (job_id, command) VALUES (:job_id1, $job$ SELECT pg_sleep(5000); $job$) RETURNING task_id AS task_id1 \gset INSERT INTO pg_dist_background_task (job_id, command) VALUES (:job_id1, $job$ SELECT pg_sleep(5); $job$) RETURNING task_id AS task_id1 \gset
INSERT INTO pg_dist_background_task (job_id, command) VALUES (:job_id1, $job$ SELECT pg_sleep(5); $job$) RETURNING task_id AS task_id2 \gset
INSERT INTO pg_dist_background_task (job_id, command) VALUES (:job_id1, $job$ SELECT pg_sleep(5); $job$) RETURNING task_id AS task_id3 \gset
INSERT INTO pg_dist_background_job (job_type, description) VALUES ('test_job', 'simple test to verify max parallel background execution') RETURNING job_id AS job_id2 \gset INSERT INTO pg_dist_background_job (job_type, description) VALUES ('test_job', 'simple test to verify max parallel background execution') RETURNING job_id AS job_id2 \gset
INSERT INTO pg_dist_background_task (job_id, command) VALUES (:job_id2, $job$ SELECT pg_sleep(5); $job$) RETURNING task_id AS task_id4 \gset
INSERT INTO pg_dist_background_job (job_type, description) VALUES ('test_job', 'simple test to verify max parallel background execution') RETURNING job_id AS job_id3 \gset
INSERT INTO pg_dist_background_task (job_id, command) VALUES (:job_id3, $job$ SELECT pg_sleep(5); $job$) RETURNING task_id AS task_id5 \gset
COMMIT;
SELECT pg_sleep(2); -- we assume this is enough time for all tasks to be in running status except the last one due to parallel worker limit
pg_sleep
---------------------------------------------------------------------
(1 row)
SELECT task_id, status FROM pg_dist_background_task
WHERE task_id IN (:task_id1, :task_id2, :task_id3, :task_id4, :task_id5)
ORDER BY task_id; -- show that last task is not running but ready to run(runnable)
task_id | status
---------------------------------------------------------------------
16 | running
17 | running
18 | running
19 | running
20 | runnable
(5 rows)
ALTER SYSTEM SET citus.max_background_task_executors TO 5;
SELECT pg_reload_conf(); -- the last runnable task will be running after change
pg_reload_conf
---------------------------------------------------------------------
t
(1 row)
SELECT citus_job_wait(:job_id3, desired_status => 'running');
citus_job_wait
---------------------------------------------------------------------
(1 row)
SELECT task_id, status FROM pg_dist_background_task
WHERE task_id IN (:task_id1, :task_id2, :task_id3, :task_id4, :task_id5)
ORDER BY task_id; -- show that last task is running
task_id | status
---------------------------------------------------------------------
16 | running
17 | running
18 | running
19 | running
20 | running
(5 rows)
SELECT citus_job_cancel(:job_id1);
citus_job_cancel
---------------------------------------------------------------------
(1 row)
SELECT citus_job_cancel(:job_id2);
citus_job_cancel
---------------------------------------------------------------------
(1 row)
SELECT citus_job_cancel(:job_id3);
citus_job_cancel
---------------------------------------------------------------------
(1 row)
SELECT citus_job_wait(:job_id1);
citus_job_wait
---------------------------------------------------------------------
(1 row)
SELECT citus_job_wait(:job_id2);
citus_job_wait
---------------------------------------------------------------------
(1 row)
SELECT citus_job_wait(:job_id3);
citus_job_wait
---------------------------------------------------------------------
(1 row)
SELECT task_id, status FROM pg_dist_background_task
WHERE task_id IN (:task_id1, :task_id2, :task_id3, :task_id4, :task_id5)
ORDER BY task_id; -- show that all tasks are cancelled
task_id | status
---------------------------------------------------------------------
16 | cancelled
17 | cancelled
18 | cancelled
19 | cancelled
20 | cancelled
(5 rows)
-- verify that upon termination signal, all tasks fail and retry policy sets their status back to runnable
BEGIN;
INSERT INTO pg_dist_background_job (job_type, description) VALUES ('test_job', 'simple test to verify termination on monitor') RETURNING job_id AS job_id1 \gset
INSERT INTO pg_dist_background_task (job_id, command) VALUES (:job_id1, $job$ SELECT pg_sleep(500); $job$) RETURNING task_id AS task_id1 \gset
INSERT INTO pg_dist_background_job (job_type, description) VALUES ('test_job', 'simple test to verify termination on monitor') RETURNING job_id AS job_id2 \gset
INSERT INTO pg_dist_background_task (job_id, command) VALUES (:job_id2, $job$ SELECT pg_sleep(500); $job$) RETURNING task_id AS task_id2 \gset
COMMIT;
SELECT citus_job_wait(:job_id1, desired_status => 'running');
citus_job_wait
---------------------------------------------------------------------
(1 row)
SELECT citus_job_wait(:job_id2, desired_status => 'running');
citus_job_wait
---------------------------------------------------------------------
(1 row)
SELECT task_id, status FROM pg_dist_background_task
WHERE task_id IN (:task_id1, :task_id2)
ORDER BY task_id;
task_id | status
---------------------------------------------------------------------
21 | running
22 | running
(2 rows)
SELECT pid AS monitor_pid FROM pg_stat_activity WHERE application_name ~ 'task queue monitor' \gset
SELECT pg_terminate_backend(:monitor_pid); -- terminate monitor process
pg_terminate_backend
---------------------------------------------------------------------
t
(1 row)
SELECT pg_sleep(2); -- wait enough to show that tasks are terminated
pg_sleep
---------------------------------------------------------------------
(1 row)
SELECT task_id, status, retry_count, message FROM pg_dist_background_task
WHERE task_id IN (:task_id1, :task_id2)
ORDER BY task_id; -- show that all tasks are runnable by retry policy after termination signal
task_id | status | retry_count | message
---------------------------------------------------------------------
21 | runnable | 1 | FATAL: terminating background worker "Citus Background Task Queue Executor: regression/postgres for (13/21)" due to administrator command+
| | | CONTEXT: Citus Background Task Queue Executor: regression/postgres for (13/21) +
| | |
22 | runnable | 1 | FATAL: terminating background worker "Citus Background Task Queue Executor: regression/postgres for (14/22)" due to administrator command+
| | | CONTEXT: Citus Background Task Queue Executor: regression/postgres for (14/22) +
| | |
(2 rows)
SELECT citus_job_cancel(:job_id1);
citus_job_cancel
---------------------------------------------------------------------
(1 row)
SELECT citus_job_cancel(:job_id2);
citus_job_cancel
---------------------------------------------------------------------
(1 row)
SELECT citus_job_wait(:job_id1);
citus_job_wait
---------------------------------------------------------------------
(1 row)
SELECT citus_job_wait(:job_id2);
citus_job_wait
---------------------------------------------------------------------
(1 row)
SELECT task_id, status FROM pg_dist_background_task
WHERE task_id IN (:task_id1, :task_id2)
ORDER BY task_id; -- show that all tasks are cancelled
task_id | status
---------------------------------------------------------------------
21 | cancelled
22 | cancelled
(2 rows)
-- verify that upon cancellation signal, all tasks are cancelled
BEGIN;
INSERT INTO pg_dist_background_job (job_type, description) VALUES ('test_job', 'simple test to verify cancellation on monitor') RETURNING job_id AS job_id1 \gset
INSERT INTO pg_dist_background_task (job_id, command) VALUES (:job_id1, $job$ SELECT pg_sleep(500); $job$) RETURNING task_id AS task_id1 \gset
INSERT INTO pg_dist_background_job (job_type, description) VALUES ('test_job', 'simple test to verify cancellation on monitor') RETURNING job_id AS job_id2 \gset
INSERT INTO pg_dist_background_task (job_id, command) VALUES (:job_id2, $job$ SELECT pg_sleep(500); $job$) RETURNING task_id AS task_id2 \gset
COMMIT;
SELECT citus_job_wait(:job_id1, desired_status => 'running');
citus_job_wait
---------------------------------------------------------------------
(1 row)
SELECT citus_job_wait(:job_id2, desired_status => 'running');
citus_job_wait
---------------------------------------------------------------------
(1 row)
SELECT task_id, status FROM pg_dist_background_task
WHERE task_id IN (:task_id1, :task_id2)
ORDER BY task_id;
task_id | status
---------------------------------------------------------------------
23 | running
24 | running
(2 rows)
SELECT pid AS monitor_pid FROM pg_stat_activity WHERE application_name ~ 'task queue monitor' \gset
SELECT pg_cancel_backend(:monitor_pid); -- cancel monitor process
pg_cancel_backend
---------------------------------------------------------------------
t
(1 row)
SELECT pg_sleep(2); -- wait enough to show that tasks are cancelled
pg_sleep
---------------------------------------------------------------------
(1 row)
SELECT citus_job_wait(:job_id1);
citus_job_wait
---------------------------------------------------------------------
(1 row)
SELECT citus_job_wait(:job_id2);
citus_job_wait
---------------------------------------------------------------------
(1 row)
SELECT task_id, status FROM pg_dist_background_task
WHERE task_id IN (:task_id1, :task_id2)
ORDER BY task_id; -- show that all tasks are cancelled
task_id | status
---------------------------------------------------------------------
23 | cancelled
24 | cancelled
(2 rows)
-- verify that task is not starved by currently long running task
BEGIN;
INSERT INTO pg_dist_background_job (job_type, description) VALUES ('test_job', 'simple test to verify task execution starvation') RETURNING job_id AS job_id1 \gset
INSERT INTO pg_dist_background_task (job_id, command) VALUES (:job_id1, $job$ SELECT pg_sleep(5000); $job$) RETURNING task_id AS task_id1 \gset
INSERT INTO pg_dist_background_job (job_type, description) VALUES ('test_job', 'simple test to verify task execution starvation') RETURNING job_id AS job_id2 \gset
INSERT INTO pg_dist_background_task (job_id, command) VALUES (:job_id2, $job$ SELECT 1; $job$) RETURNING task_id AS task_id2 \gset INSERT INTO pg_dist_background_task (job_id, command) VALUES (:job_id2, $job$ SELECT 1; $job$) RETURNING task_id AS task_id2 \gset
COMMIT; COMMIT;
SELECT citus_job_wait(:job_id1, desired_status => 'running'); SELECT citus_job_wait(:job_id1, desired_status => 'running');
@ -326,8 +583,8 @@ SELECT job_id, task_id, status FROM pg_dist_background_task
ORDER BY job_id, task_id; -- show that last task is finished without starvation ORDER BY job_id, task_id; -- show that last task is finished without starvation
job_id | task_id | status job_id | task_id | status
--------------------------------------------------------------------- ---------------------------------------------------------------------
10 | 16 | running 17 | 25 | running
11 | 17 | done 18 | 26 | done
(2 rows) (2 rows)
SELECT citus_job_cancel(:job_id1); SELECT citus_job_cancel(:job_id1);
@ -336,6 +593,21 @@ SELECT citus_job_cancel(:job_id1);
(1 row) (1 row)
SELECT citus_job_wait(:job_id1);
citus_job_wait
---------------------------------------------------------------------
(1 row)
SELECT job_id, task_id, status FROM pg_dist_background_task
WHERE task_id IN (:task_id1, :task_id2)
ORDER BY job_id, task_id; -- show that task is cancelled
job_id | task_id | status
---------------------------------------------------------------------
17 | 25 | cancelled
18 | 26 | done
(2 rows)
SET client_min_messages TO WARNING; SET client_min_messages TO WARNING;
DROP SCHEMA background_task_queue_monitor CASCADE; DROP SCHEMA background_task_queue_monitor CASCADE;
ALTER SYSTEM RESET citus.background_task_queue_interval; ALTER SYSTEM RESET citus.background_task_queue_interval;

View File

@ -525,8 +525,9 @@ NOTICE: cleaned up 11 orphaned resources
-- END: Split a partition table directly -- END: Split a partition table directly
-- BEGIN: Perform deferred cleanup. -- BEGIN: Perform deferred cleanup.
SET client_min_messages TO WARNING;
CALL pg_catalog.citus_cleanup_orphaned_resources(); CALL pg_catalog.citus_cleanup_orphaned_resources();
NOTICE: cleaned up 11 orphaned resources RESET client_min_messages;
-- END: Perform deferred cleanup. -- END: Perform deferred cleanup.
-- BEGIN: Validate Shard Info and Data -- BEGIN: Validate Shard Info and Data
SELECT shard.shardid, logicalrelid, shardminvalue, shardmaxvalue, nodename, nodeport SELECT shard.shardid, logicalrelid, shardminvalue, shardmaxvalue, nodename, nodeport

View File

@ -47,6 +47,11 @@ SELECT pg_catalog.citus_split_shard_by_split_points(
(1 row) (1 row)
-- BEGIN: Perform deferred cleanup.
SET client_min_messages TO WARNING;
CALL pg_catalog.citus_cleanup_orphaned_resources();
RESET client_min_messages;
-- END: Perform deferred cleanup.
\c - - - :worker_1_port \c - - - :worker_1_port
SET search_path TO "citus_split_test_schema"; SET search_path TO "citus_split_test_schema";
SET citus.show_shards_for_app_name_prefixes = '*'; SET citus.show_shards_for_app_name_prefixes = '*';

View File

@ -237,8 +237,9 @@ SELECT pg_catalog.citus_split_shard_by_split_points(
(1 row) (1 row)
-- BEGIN: Perform deferred cleanup. -- BEGIN: Perform deferred cleanup.
SET client_min_messages TO WARNING;
CALL pg_catalog.citus_cleanup_orphaned_resources(); CALL pg_catalog.citus_cleanup_orphaned_resources();
NOTICE: cleaned up 3 orphaned resources RESET client_min_messages;
-- END: Perform deferred cleanup. -- END: Perform deferred cleanup.
-- Perform 3 way split -- Perform 3 way split
SELECT pg_catalog.citus_split_shard_by_split_points( SELECT pg_catalog.citus_split_shard_by_split_points(
@ -253,8 +254,9 @@ SELECT pg_catalog.citus_split_shard_by_split_points(
-- END : Split two shards : One with move and One without move. -- END : Split two shards : One with move and One without move.
-- BEGIN: Perform deferred cleanup. -- BEGIN: Perform deferred cleanup.
SET client_min_messages TO WARNING;
CALL pg_catalog.citus_cleanup_orphaned_resources(); CALL pg_catalog.citus_cleanup_orphaned_resources();
NOTICE: cleaned up 3 orphaned resources RESET client_min_messages;
-- END: Perform deferred cleanup. -- END: Perform deferred cleanup.
-- BEGIN : Move a shard post split. -- BEGIN : Move a shard post split.
SELECT citus_move_shard_placement(8981007, 'localhost', :worker_1_port, 'localhost', :worker_2_port, shard_transfer_mode:='block_writes'); SELECT citus_move_shard_placement(8981007, 'localhost', :worker_1_port, 'localhost', :worker_2_port, shard_transfer_mode:='block_writes');
@ -474,7 +476,9 @@ ERROR: cannot use logical replication to transfer shards of the relation table_
DETAIL: UPDATE and DELETE commands on the shard will error out during logical replication unless there is a REPLICA IDENTITY or PRIMARY KEY. DETAIL: UPDATE and DELETE commands on the shard will error out during logical replication unless there is a REPLICA IDENTITY or PRIMARY KEY.
HINT: If you wish to continue without a replica identity set the shard_transfer_mode to 'force_logical' or 'block_writes'. HINT: If you wish to continue without a replica identity set the shard_transfer_mode to 'force_logical' or 'block_writes'.
-- BEGIN: Perform deferred cleanup. -- BEGIN: Perform deferred cleanup.
SET client_min_messages TO WARNING;
CALL pg_catalog.citus_cleanup_orphaned_resources(); CALL pg_catalog.citus_cleanup_orphaned_resources();
RESET client_min_messages;
-- END: Perform deferred cleanup. -- END: Perform deferred cleanup.
SELECT shard.shardid, logicalrelid, shardminvalue, shardmaxvalue, nodename, nodeport SELECT shard.shardid, logicalrelid, shardminvalue, shardmaxvalue, nodename, nodeport
FROM pg_dist_shard AS shard FROM pg_dist_shard AS shard
@ -522,8 +526,9 @@ SELECT pg_catalog.citus_split_shard_by_split_points(
(1 row) (1 row)
-- BEGIN: Perform deferred cleanup. -- BEGIN: Perform deferred cleanup.
SET client_min_messages TO WARNING;
CALL pg_catalog.citus_cleanup_orphaned_resources(); CALL pg_catalog.citus_cleanup_orphaned_resources();
NOTICE: cleaned up 3 orphaned resources RESET client_min_messages;
-- END: Perform deferred cleanup. -- END: Perform deferred cleanup.
SELECT shard.shardid, logicalrelid, shardminvalue, shardmaxvalue, nodename, nodeport SELECT shard.shardid, logicalrelid, shardminvalue, shardmaxvalue, nodename, nodeport
FROM pg_dist_shard AS shard FROM pg_dist_shard AS shard
@ -580,6 +585,28 @@ SELECT COUNT(*) FROM colocated_dist_table;
-- END: Validate Data Count -- END: Validate Data Count
--BEGIN : Cleanup --BEGIN : Cleanup
\c - postgres - :master_port \c - postgres - :master_port
-- make sure we don't have any replication objects leftover on the workers
SELECT run_command_on_workers($$SELECT count(*) FROM pg_replication_slots$$);
run_command_on_workers
---------------------------------------------------------------------
(localhost,57637,t,0)
(localhost,57638,t,0)
(2 rows)
SELECT run_command_on_workers($$SELECT count(*) FROM pg_publication$$);
run_command_on_workers
---------------------------------------------------------------------
(localhost,57637,t,0)
(localhost,57638,t,0)
(2 rows)
SELECT run_command_on_workers($$SELECT count(*) FROM pg_subscription$$);
run_command_on_workers
---------------------------------------------------------------------
(localhost,57637,t,0)
(localhost,57638,t,0)
(2 rows)
ALTER SYSTEM RESET citus.defer_shard_delete_interval; ALTER SYSTEM RESET citus.defer_shard_delete_interval;
SELECT pg_reload_conf(); SELECT pg_reload_conf();
pg_reload_conf pg_reload_conf

View File

@ -325,8 +325,9 @@ SELECT run_command_on_workers($$DROP SUBSCRIPTION IF EXISTS citus_shard_move_sub
-- cleanup leftovers -- cleanup leftovers
-- verify we don't see any error for already dropped subscription -- verify we don't see any error for already dropped subscription
SET client_min_messages TO WARNING;
CALL citus_cleanup_orphaned_resources(); CALL citus_cleanup_orphaned_resources();
NOTICE: cleaned up 3 orphaned resources RESET client_min_messages;
-- cancellation on dropping subscription -- cancellation on dropping subscription
SELECT citus.mitmproxy('conn.onQuery(query="^DROP SUBSCRIPTION").cancel(' || :pid || ')'); SELECT citus.mitmproxy('conn.onQuery(query="^DROP SUBSCRIPTION").cancel(' || :pid || ')');
mitmproxy mitmproxy

View File

@ -52,11 +52,11 @@ WARNING: failed to clean up 2 orphaned shards out of 5 after a citus_split_shar
ERROR: connection not open ERROR: connection not open
CONTEXT: while executing command on localhost:xxxxx CONTEXT: while executing command on localhost:xxxxx
SELECT operation_id, object_type, object_name, node_group_id, policy_type SELECT operation_id, object_type, object_name, node_group_id, policy_type
FROM pg_dist_cleanup where operation_id = 777; FROM pg_dist_cleanup where operation_id = 777 ORDER BY object_name;
operation_id | object_type | object_name | node_group_id | policy_type operation_id | object_type | object_name | node_group_id | policy_type
--------------------------------------------------------------------- ---------------------------------------------------------------------
777 | 1 | citus_failure_split_cleanup_schema.table_to_split_8981003 | 2 | 1
777 | 1 | citus_failure_split_cleanup_schema.table_to_split_8981002 | 2 | 0 777 | 1 | citus_failure_split_cleanup_schema.table_to_split_8981002 | 2 | 0
777 | 1 | citus_failure_split_cleanup_schema.table_to_split_8981003 | 2 | 1
777 | 4 | citus_shard_split_publication_xxxxxxx_xxxxxxx | 2 | 0 777 | 4 | citus_shard_split_publication_xxxxxxx_xxxxxxx | 2 | 0
(3 rows) (3 rows)
@ -101,7 +101,7 @@ CONTEXT: while executing command on localhost:xxxxx
CALL pg_catalog.citus_cleanup_orphaned_resources(); CALL pg_catalog.citus_cleanup_orphaned_resources();
NOTICE: cleaned up 3 orphaned resources NOTICE: cleaned up 3 orphaned resources
SELECT operation_id, object_type, object_name, node_group_id, policy_type SELECT operation_id, object_type, object_name, node_group_id, policy_type
FROM pg_dist_cleanup where operation_id = 777; FROM pg_dist_cleanup where operation_id = 777 ORDER BY object_name;
operation_id | object_type | object_name | node_group_id | policy_type operation_id | object_type | object_name | node_group_id | policy_type
--------------------------------------------------------------------- ---------------------------------------------------------------------
(0 rows) (0 rows)
@ -155,11 +155,11 @@ NOTICE: cleaned up 3 orphaned resources
ERROR: Failed to run worker_split_shard_replication_setup UDF. It should successfully execute for splitting a shard in a non-blocking way. Please retry. ERROR: Failed to run worker_split_shard_replication_setup UDF. It should successfully execute for splitting a shard in a non-blocking way. Please retry.
RESET client_min_messages; RESET client_min_messages;
SELECT operation_id, object_type, object_name, node_group_id, policy_type SELECT operation_id, object_type, object_name, node_group_id, policy_type
FROM pg_dist_cleanup where operation_id = 777; FROM pg_dist_cleanup where operation_id = 777 ORDER BY object_name;
operation_id | object_type | object_name | node_group_id | policy_type operation_id | object_type | object_name | node_group_id | policy_type
--------------------------------------------------------------------- ---------------------------------------------------------------------
777 | 1 | citus_failure_split_cleanup_schema.table_to_split_8981003 | 2 | 1
777 | 1 | citus_failure_split_cleanup_schema.table_to_split_8981002 | 2 | 0 777 | 1 | citus_failure_split_cleanup_schema.table_to_split_8981002 | 2 | 0
777 | 1 | citus_failure_split_cleanup_schema.table_to_split_8981003 | 2 | 1
777 | 4 | citus_shard_split_publication_xxxxxxx_xxxxxxx | 2 | 0 777 | 4 | citus_shard_split_publication_xxxxxxx_xxxxxxx | 2 | 0
777 | 4 | citus_shard_split_publication_xxxxxxx_xxxxxxx | 2 | 0 777 | 4 | citus_shard_split_publication_xxxxxxx_xxxxxxx | 2 | 0
(4 rows) (4 rows)
@ -207,7 +207,7 @@ ERROR: Failed to run worker_split_shard_replication_setup UDF. It should succes
CALL pg_catalog.citus_cleanup_orphaned_resources(); CALL pg_catalog.citus_cleanup_orphaned_resources();
NOTICE: cleaned up 4 orphaned resources NOTICE: cleaned up 4 orphaned resources
SELECT operation_id, object_type, object_name, node_group_id, policy_type SELECT operation_id, object_type, object_name, node_group_id, policy_type
FROM pg_dist_cleanup where operation_id = 777; FROM pg_dist_cleanup where operation_id = 777 ORDER BY object_name;
operation_id | object_type | object_name | node_group_id | policy_type operation_id | object_type | object_name | node_group_id | policy_type
--------------------------------------------------------------------- ---------------------------------------------------------------------
(0 rows) (0 rows)
@ -266,11 +266,11 @@ WARNING: failed to clean up 2 orphaned shards out of 7 after a citus_split_shar
ERROR: connection not open ERROR: connection not open
CONTEXT: while executing command on localhost:xxxxx CONTEXT: while executing command on localhost:xxxxx
SELECT operation_id, object_type, object_name, node_group_id, policy_type SELECT operation_id, object_type, object_name, node_group_id, policy_type
FROM pg_dist_cleanup where operation_id = 777; FROM pg_dist_cleanup where operation_id = 777 ORDER BY object_name;
operation_id | object_type | object_name | node_group_id | policy_type operation_id | object_type | object_name | node_group_id | policy_type
--------------------------------------------------------------------- ---------------------------------------------------------------------
777 | 1 | citus_failure_split_cleanup_schema.table_to_split_8981003 | 2 | 1
777 | 1 | citus_failure_split_cleanup_schema.table_to_split_8981002 | 2 | 0 777 | 1 | citus_failure_split_cleanup_schema.table_to_split_8981002 | 2 | 0
777 | 1 | citus_failure_split_cleanup_schema.table_to_split_8981003 | 2 | 1
777 | 4 | citus_shard_split_publication_xxxxxxx_xxxxxxx | 2 | 0 777 | 4 | citus_shard_split_publication_xxxxxxx_xxxxxxx | 2 | 0
777 | 4 | citus_shard_split_publication_xxxxxxx_xxxxxxx | 2 | 0 777 | 4 | citus_shard_split_publication_xxxxxxx_xxxxxxx | 2 | 0
777 | 3 | citus_shard_split_slot_xxxxxxx_xxxxxxx | 2 | 0 777 | 3 | citus_shard_split_slot_xxxxxxx_xxxxxxx | 2 | 0
@ -319,7 +319,7 @@ CONTEXT: while executing command on localhost:xxxxx
CALL pg_catalog.citus_cleanup_orphaned_resources(); CALL pg_catalog.citus_cleanup_orphaned_resources();
NOTICE: cleaned up 5 orphaned resources NOTICE: cleaned up 5 orphaned resources
SELECT operation_id, object_type, object_name, node_group_id, policy_type SELECT operation_id, object_type, object_name, node_group_id, policy_type
FROM pg_dist_cleanup where operation_id = 777; FROM pg_dist_cleanup where operation_id = 777 ORDER BY object_name;
operation_id | object_type | object_name | node_group_id | policy_type operation_id | object_type | object_name | node_group_id | policy_type
--------------------------------------------------------------------- ---------------------------------------------------------------------
(0 rows) (0 rows)
@ -378,17 +378,17 @@ WARNING: failed to clean up 2 orphaned shards out of 12 after a citus_split_sha
ERROR: connection not open ERROR: connection not open
CONTEXT: while executing command on localhost:xxxxx CONTEXT: while executing command on localhost:xxxxx
SELECT operation_id, object_type, object_name, node_group_id, policy_type SELECT operation_id, object_type, object_name, node_group_id, policy_type
FROM pg_dist_cleanup where operation_id = 777; FROM pg_dist_cleanup where operation_id = 777 ORDER BY object_name;
operation_id | object_type | object_name | node_group_id | policy_type operation_id | object_type | object_name | node_group_id | policy_type
--------------------------------------------------------------------- ---------------------------------------------------------------------
777 | 1 | citus_failure_split_cleanup_schema.table_to_split_8981003 | 2 | 1
777 | 1 | citus_failure_split_cleanup_schema.table_to_split_8981002 | 2 | 0 777 | 1 | citus_failure_split_cleanup_schema.table_to_split_8981002 | 2 | 0
777 | 1 | citus_failure_split_cleanup_schema.table_to_split_8981003 | 2 | 1
777 | 4 | citus_shard_split_publication_xxxxxxx_xxxxxxx | 2 | 0 777 | 4 | citus_shard_split_publication_xxxxxxx_xxxxxxx | 2 | 0
777 | 4 | citus_shard_split_publication_xxxxxxx_xxxxxxx | 2 | 0 777 | 4 | citus_shard_split_publication_xxxxxxx_xxxxxxx | 2 | 0
777 | 3 | citus_shard_split_slot_xxxxxxx_xxxxxxx | 2 | 0 777 | 3 | citus_shard_split_slot_xxxxxxx_xxxxxxx | 2 | 0
777 | 3 | citus_shard_split_slot_xxxxxxx_xxxxxxx | 2 | 0 777 | 3 | citus_shard_split_slot_xxxxxxx_xxxxxxx | 2 | 0
777 | 5 | citus_shard_split_subscription_role_10 | 2 | 0
777 | 2 | citus_shard_split_subscription_xxxxxxx | 2 | 0 777 | 2 | citus_shard_split_subscription_xxxxxxx | 2 | 0
777 | 5 | citus_shard_split_subscription_role_10 | 2 | 0
(8 rows) (8 rows)
-- we need to allow connection so that we can connect to proxy -- we need to allow connection so that we can connect to proxy
@ -437,7 +437,7 @@ CONTEXT: while executing command on localhost:xxxxx
CALL pg_catalog.citus_cleanup_orphaned_resources(); CALL pg_catalog.citus_cleanup_orphaned_resources();
NOTICE: cleaned up 8 orphaned resources NOTICE: cleaned up 8 orphaned resources
SELECT operation_id, object_type, object_name, node_group_id, policy_type SELECT operation_id, object_type, object_name, node_group_id, policy_type
FROM pg_dist_cleanup where operation_id = 777; FROM pg_dist_cleanup where operation_id = 777 ORDER BY object_name;
operation_id | object_type | object_name | node_group_id | policy_type operation_id | object_type | object_name | node_group_id | policy_type
--------------------------------------------------------------------- ---------------------------------------------------------------------
(0 rows) (0 rows)
@ -496,17 +496,17 @@ WARNING: failed to clean up 2 orphaned shards out of 12 after a citus_split_sha
ERROR: connection not open ERROR: connection not open
CONTEXT: while executing command on localhost:xxxxx CONTEXT: while executing command on localhost:xxxxx
SELECT operation_id, object_type, object_name, node_group_id, policy_type SELECT operation_id, object_type, object_name, node_group_id, policy_type
FROM pg_dist_cleanup where operation_id = 777; FROM pg_dist_cleanup where operation_id = 777 ORDER BY object_name;
operation_id | object_type | object_name | node_group_id | policy_type operation_id | object_type | object_name | node_group_id | policy_type
--------------------------------------------------------------------- ---------------------------------------------------------------------
777 | 1 | citus_failure_split_cleanup_schema.table_to_split_8981003 | 2 | 1
777 | 1 | citus_failure_split_cleanup_schema.table_to_split_8981002 | 2 | 0 777 | 1 | citus_failure_split_cleanup_schema.table_to_split_8981002 | 2 | 0
777 | 1 | citus_failure_split_cleanup_schema.table_to_split_8981003 | 2 | 1
777 | 4 | citus_shard_split_publication_xxxxxxx_xxxxxxx | 2 | 0 777 | 4 | citus_shard_split_publication_xxxxxxx_xxxxxxx | 2 | 0
777 | 4 | citus_shard_split_publication_xxxxxxx_xxxxxxx | 2 | 0 777 | 4 | citus_shard_split_publication_xxxxxxx_xxxxxxx | 2 | 0
777 | 3 | citus_shard_split_slot_xxxxxxx_xxxxxxx | 2 | 0 777 | 3 | citus_shard_split_slot_xxxxxxx_xxxxxxx | 2 | 0
777 | 3 | citus_shard_split_slot_xxxxxxx_xxxxxxx | 2 | 0 777 | 3 | citus_shard_split_slot_xxxxxxx_xxxxxxx | 2 | 0
777 | 5 | citus_shard_split_subscription_role_10 | 2 | 0
777 | 2 | citus_shard_split_subscription_xxxxxxx | 2 | 0 777 | 2 | citus_shard_split_subscription_xxxxxxx | 2 | 0
777 | 5 | citus_shard_split_subscription_role_10 | 2 | 0
(8 rows) (8 rows)
-- we need to allow connection so that we can connect to proxy -- we need to allow connection so that we can connect to proxy
@ -555,7 +555,7 @@ CONTEXT: while executing command on localhost:xxxxx
CALL pg_catalog.citus_cleanup_orphaned_resources(); CALL pg_catalog.citus_cleanup_orphaned_resources();
NOTICE: cleaned up 8 orphaned resources NOTICE: cleaned up 8 orphaned resources
SELECT operation_id, object_type, object_name, node_group_id, policy_type SELECT operation_id, object_type, object_name, node_group_id, policy_type
FROM pg_dist_cleanup where operation_id = 777; FROM pg_dist_cleanup where operation_id = 777 ORDER BY object_name;
operation_id | object_type | object_name | node_group_id | policy_type operation_id | object_type | object_name | node_group_id | policy_type
--------------------------------------------------------------------- ---------------------------------------------------------------------
(0 rows) (0 rows)
@ -615,7 +615,7 @@ WARNING: connection to the remote node localhost:xxxxx failed with the followin
ERROR: connection not open ERROR: connection not open
CONTEXT: while executing command on localhost:xxxxx CONTEXT: while executing command on localhost:xxxxx
SELECT operation_id, object_type, object_name, node_group_id, policy_type SELECT operation_id, object_type, object_name, node_group_id, policy_type
FROM pg_dist_cleanup where operation_id = 777; FROM pg_dist_cleanup where operation_id = 777 ORDER BY object_name;
operation_id | object_type | object_name | node_group_id | policy_type operation_id | object_type | object_name | node_group_id | policy_type
--------------------------------------------------------------------- ---------------------------------------------------------------------
777 | 1 | citus_failure_split_cleanup_schema.table_to_split_8981002 | 1 | 1 777 | 1 | citus_failure_split_cleanup_schema.table_to_split_8981002 | 1 | 1
@ -624,8 +624,8 @@ CONTEXT: while executing command on localhost:xxxxx
777 | 4 | citus_shard_split_publication_xxxxxxx_xxxxxxx | 2 | 0 777 | 4 | citus_shard_split_publication_xxxxxxx_xxxxxxx | 2 | 0
777 | 3 | citus_shard_split_slot_xxxxxxx_xxxxxxx | 2 | 0 777 | 3 | citus_shard_split_slot_xxxxxxx_xxxxxxx | 2 | 0
777 | 3 | citus_shard_split_slot_xxxxxxx_xxxxxxx | 2 | 0 777 | 3 | citus_shard_split_slot_xxxxxxx_xxxxxxx | 2 | 0
777 | 5 | citus_shard_split_subscription_role_10 | 2 | 0
777 | 2 | citus_shard_split_subscription_xxxxxxx | 2 | 0 777 | 2 | citus_shard_split_subscription_xxxxxxx | 2 | 0
777 | 5 | citus_shard_split_subscription_role_10 | 2 | 0
(8 rows) (8 rows)
SELECT relname FROM pg_class where relname LIKE '%table_to_split_%' AND relkind = 'r' order by relname; SELECT relname FROM pg_class where relname LIKE '%table_to_split_%' AND relkind = 'r' order by relname;
@ -678,7 +678,7 @@ CONTEXT: while executing command on localhost:xxxxx
CALL pg_catalog.citus_cleanup_orphaned_resources(); CALL pg_catalog.citus_cleanup_orphaned_resources();
NOTICE: cleaned up 8 orphaned resources NOTICE: cleaned up 8 orphaned resources
SELECT operation_id, object_type, object_name, node_group_id, policy_type SELECT operation_id, object_type, object_name, node_group_id, policy_type
FROM pg_dist_cleanup where operation_id = 777; FROM pg_dist_cleanup where operation_id = 777 ORDER BY object_name;
operation_id | object_type | object_name | node_group_id | policy_type operation_id | object_type | object_name | node_group_id | policy_type
--------------------------------------------------------------------- ---------------------------------------------------------------------
(0 rows) (0 rows)

View File

@ -0,0 +1,38 @@
--
-- ISSUE_5763
--
-- Issue: DROP OWNED BY fails to drop the schemas on the workers
-- Link: https://github.com/citusdata/citus/issues/5763
--
CREATE USER issue_5763_1 WITH SUPERUSER;
CREATE USER issue_5763_2 WITH SUPERUSER;
\c - issue_5763_1 - :master_port
CREATE SCHEMA issue_5763_sc_1;
\c - issue_5763_2 - :master_port
CREATE SCHEMA issue_5763_sc_2;
\c - postgres - :master_port
DROP OWNED BY issue_5763_1, issue_5763_2;
\c - issue_5763_1 - :master_port
CREATE SCHEMA issue_5763_sc_1;
\c - postgres - :master_port
DROP SCHEMA issue_5763_sc_1;
DROP USER issue_5763_1, issue_5763_2;
-- test CASCADE options
CREATE USER issue_5763_3 WITH SUPERUSER;
\c - issue_5763_3 - :master_port
CREATE SCHEMA issue_5763_sc_3;
CREATE TABLE issue_5763_sc_3.tb1(id int);
\c - postgres - :master_port
DROP OWNED BY issue_5763_3 CASCADE;
DROP USER issue_5763_3;
-- test non-distributed role
SET citus.enable_create_role_propagation TO off;
CREATE USER issue_5763_4 WITH SUPERUSER;
NOTICE: not propagating CREATE ROLE/USER commands to worker nodes
HINT: Connect to worker nodes directly to manually create all necessary users and roles.
\c - issue_5763_4 - :master_port
set citus.enable_ddl_propagation = off;
CREATE SCHEMA issue_5763_sc_4;
\c - postgres - :master_port
DROP OWNED BY issue_5763_4 RESTRICT;
DROP USER issue_5763_4;

View File

@ -545,27 +545,6 @@ DROP TABLE
test, test,
test_coloc, test_coloc,
colocation_table; colocation_table;
SELECT run_command_on_workers($$DROP OWNED BY full_access$$);
run_command_on_workers
---------------------------------------------------------------------
(localhost,57637,t,"DROP OWNED")
(localhost,57638,t,"DROP OWNED")
(2 rows)
SELECT run_command_on_workers($$DROP OWNED BY some_role$$);
run_command_on_workers
---------------------------------------------------------------------
(localhost,57637,t,"DROP OWNED")
(localhost,57638,t,"DROP OWNED")
(2 rows)
SELECT run_command_on_workers($$DROP OWNED BY read_access$$);
run_command_on_workers
---------------------------------------------------------------------
(localhost,57637,t,"DROP OWNED")
(localhost,57638,t,"DROP OWNED")
(2 rows)
DROP USER full_access; DROP USER full_access;
DROP USER read_access; DROP USER read_access;
DROP USER no_access; DROP USER no_access;

View File

@ -1154,13 +1154,6 @@ SET citus.next_shard_id TO 1197000;
-- we do not use run_command_on_coordinator_and_workers here because when there is CASCADE, it causes deadlock -- we do not use run_command_on_coordinator_and_workers here because when there is CASCADE, it causes deadlock
DROP OWNED BY "test-user" CASCADE; DROP OWNED BY "test-user" CASCADE;
NOTICE: drop cascades to table schema_with_user.test_table NOTICE: drop cascades to table schema_with_user.test_table
SELECT run_command_on_workers('DROP OWNED BY "test-user" CASCADE');
run_command_on_workers
---------------------------------------------------------------------
(localhost,57637,t,"DROP OWNED")
(localhost,57638,t,"DROP OWNED")
(2 rows)
DROP USER "test-user"; DROP USER "test-user";
DROP FUNCTION run_command_on_coordinator_and_workers(p_sql text); DROP FUNCTION run_command_on_coordinator_and_workers(p_sql text);
-- test run_command_on_* UDFs with schema -- test run_command_on_* UDFs with schema

View File

@ -1456,6 +1456,36 @@ NOTICE: drop cascades to 2 other objects
CREATE DATABASE db_with_oid OID 987654; CREATE DATABASE db_with_oid OID 987654;
NOTICE: Citus partially supports CREATE DATABASE for distributed databases NOTICE: Citus partially supports CREATE DATABASE for distributed databases
DROP DATABASE db_with_oid; DROP DATABASE db_with_oid;
-- SET ACCESS METHOD
-- Create a heap2 table am handler with heapam handler
CREATE ACCESS METHOD heap2 TYPE TABLE HANDLER heap_tableam_handler;
SELECT run_command_on_workers($$CREATE ACCESS METHOD heap2 TYPE TABLE HANDLER heap_tableam_handler$$);
run_command_on_workers
---------------------------------------------------------------------
(localhost,57637,t,"CREATE ACCESS METHOD")
(localhost,57638,t,"CREATE ACCESS METHOD")
(2 rows)
CREATE TABLE mx_ddl_table2 (
key int primary key,
value int
);
SELECT create_distributed_table('mx_ddl_table2', 'key', 'hash', shard_count=> 4);
create_distributed_table
---------------------------------------------------------------------
(1 row)
ALTER TABLE mx_ddl_table2 SET ACCESS METHOD heap2;
DROP TABLE mx_ddl_table2;
DROP ACCESS METHOD heap2;
SELECT run_command_on_workers($$DROP ACCESS METHOD heap2$$);
run_command_on_workers
---------------------------------------------------------------------
(localhost,57637,t,"DROP ACCESS METHOD")
(localhost,57638,t,"DROP ACCESS METHOD")
(2 rows)
-- Clean up -- Clean up
\set VERBOSITY terse \set VERBOSITY terse
SET client_min_messages TO ERROR; SET client_min_messages TO ERROR;

View File

@ -489,18 +489,6 @@ SET client_min_messages TO WARNING;
DROP SCHEMA single_node_ent CASCADE; DROP SCHEMA single_node_ent CASCADE;
DROP OWNED BY full_access_single_node; DROP OWNED BY full_access_single_node;
DROP OWNED BY read_access_single_node; DROP OWNED BY read_access_single_node;
SELECT run_command_on_workers($$DROP OWNED BY full_access_single_node$$);
run_command_on_workers
---------------------------------------------------------------------
(localhost,57637,t,"DROP OWNED")
(1 row)
SELECT run_command_on_workers($$DROP OWNED BY read_access_single_node$$);
run_command_on_workers
---------------------------------------------------------------------
(localhost,57637,t,"DROP OWNED")
(1 row)
DROP ROLE full_access_single_node; DROP ROLE full_access_single_node;
DROP ROLE read_access_single_node; DROP ROLE read_access_single_node;
-- remove the nodes for next tests -- remove the nodes for next tests

View File

@ -95,7 +95,7 @@ test: multi_dropped_column_aliases foreign_key_restriction_enforcement
test: binary_protocol test: binary_protocol
test: alter_table_set_access_method test: alter_table_set_access_method
test: alter_distributed_table test: alter_distributed_table
test: issue_5248 issue_5099 test: issue_5248 issue_5099 issue_5763
test: object_propagation_debug test: object_propagation_debug
test: undistribute_table test: undistribute_table
test: run_command_on_all_nodes test: run_command_on_all_nodes
@ -113,3 +113,4 @@ test: ensure_no_intermediate_data_leak
# -------- # --------
test: ensure_no_shared_connection_leak test: ensure_no_shared_connection_leak
test: check_mx test: check_mx

View File

@ -54,6 +54,7 @@ SELECT status, pid, retry_count, NOT(message = '') AS has_message, (not_before >
-- test cancelling a failed/retrying job -- test cancelling a failed/retrying job
SELECT citus_job_cancel(:job_id); SELECT citus_job_cancel(:job_id);
SELECT citus_job_wait(:job_id);
SELECT state, NOT(started_at IS NULL) AS did_start FROM pg_dist_background_job WHERE job_id = :job_id; SELECT state, NOT(started_at IS NULL) AS did_start FROM pg_dist_background_job WHERE job_id = :job_id;
SELECT status, NOT(message = '') AS did_start FROM pg_dist_background_task WHERE job_id = :job_id ORDER BY task_id ASC; SELECT status, NOT(message = '') AS did_start FROM pg_dist_background_task WHERE job_id = :job_id ORDER BY task_id ASC;
@ -110,6 +111,7 @@ SELECT job_id, task_id, status FROM pg_dist_background_task
ORDER BY job_id, task_id; -- show that last task is not running but ready to run(runnable) ORDER BY job_id, task_id; -- show that last task is not running but ready to run(runnable)
SELECT citus_job_cancel(:job_id2); -- when a job with 1 task is cancelled, the last runnable task will be running SELECT citus_job_cancel(:job_id2); -- when a job with 1 task is cancelled, the last runnable task will be running
SELECT citus_job_wait(:job_id2); -- wait for the job to be cancelled
SELECT citus_job_wait(:job_id3, desired_status => 'running'); SELECT citus_job_wait(:job_id3, desired_status => 'running');
SELECT job_id, task_id, status FROM pg_dist_background_task SELECT job_id, task_id, status FROM pg_dist_background_task
WHERE task_id IN (:task_id1, :task_id2, :task_id3, :task_id4, :task_id5) WHERE task_id IN (:task_id1, :task_id2, :task_id3, :task_id4, :task_id5)
@ -118,18 +120,115 @@ SELECT job_id, task_id, status FROM pg_dist_background_task
SELECT citus_job_cancel(:job_id1); SELECT citus_job_cancel(:job_id1);
SELECT citus_job_cancel(:job_id3); SELECT citus_job_cancel(:job_id3);
SELECT citus_job_wait(:job_id1); SELECT citus_job_wait(:job_id1);
SELECT citus_job_wait(:job_id2);
SELECT citus_job_wait(:job_id3); SELECT citus_job_wait(:job_id3);
SELECT job_id, task_id, status FROM pg_dist_background_task SELECT job_id, task_id, status FROM pg_dist_background_task
WHERE task_id IN (:task_id1, :task_id2, :task_id3, :task_id4, :task_id5) WHERE task_id IN (:task_id1, :task_id2, :task_id3, :task_id4, :task_id5)
ORDER BY job_id, task_id; -- show that multiple cancels worked ORDER BY job_id, task_id; -- show that multiple cancels worked
-- verify that task is not starved by currently long running task -- verify that a task, previously not started due to lack of workers, is executed after we increase max worker count
BEGIN; BEGIN;
INSERT INTO pg_dist_background_job (job_type, description) VALUES ('test_job', 'simple test to verify max parallel background execution') RETURNING job_id AS job_id1 \gset INSERT INTO pg_dist_background_job (job_type, description) VALUES ('test_job', 'simple test to verify max parallel background execution') RETURNING job_id AS job_id1 \gset
INSERT INTO pg_dist_background_task (job_id, command) VALUES (:job_id1, $job$ SELECT pg_sleep(5000); $job$) RETURNING task_id AS task_id1 \gset INSERT INTO pg_dist_background_task (job_id, command) VALUES (:job_id1, $job$ SELECT pg_sleep(5); $job$) RETURNING task_id AS task_id1 \gset
INSERT INTO pg_dist_background_task (job_id, command) VALUES (:job_id1, $job$ SELECT pg_sleep(5); $job$) RETURNING task_id AS task_id2 \gset
INSERT INTO pg_dist_background_task (job_id, command) VALUES (:job_id1, $job$ SELECT pg_sleep(5); $job$) RETURNING task_id AS task_id3 \gset
INSERT INTO pg_dist_background_job (job_type, description) VALUES ('test_job', 'simple test to verify max parallel background execution') RETURNING job_id AS job_id2 \gset INSERT INTO pg_dist_background_job (job_type, description) VALUES ('test_job', 'simple test to verify max parallel background execution') RETURNING job_id AS job_id2 \gset
INSERT INTO pg_dist_background_task (job_id, command) VALUES (:job_id2, $job$ SELECT pg_sleep(5); $job$) RETURNING task_id AS task_id4 \gset
INSERT INTO pg_dist_background_job (job_type, description) VALUES ('test_job', 'simple test to verify max parallel background execution') RETURNING job_id AS job_id3 \gset
INSERT INTO pg_dist_background_task (job_id, command) VALUES (:job_id3, $job$ SELECT pg_sleep(5); $job$) RETURNING task_id AS task_id5 \gset
COMMIT;
SELECT pg_sleep(2); -- we assume this is enough time for all tasks to be in running status except the last one due to parallel worker limit
SELECT task_id, status FROM pg_dist_background_task
WHERE task_id IN (:task_id1, :task_id2, :task_id3, :task_id4, :task_id5)
ORDER BY task_id; -- show that last task is not running but ready to run(runnable)
ALTER SYSTEM SET citus.max_background_task_executors TO 5;
SELECT pg_reload_conf(); -- the last runnable task will be running after change
SELECT citus_job_wait(:job_id3, desired_status => 'running');
SELECT task_id, status FROM pg_dist_background_task
WHERE task_id IN (:task_id1, :task_id2, :task_id3, :task_id4, :task_id5)
ORDER BY task_id; -- show that last task is running
SELECT citus_job_cancel(:job_id1);
SELECT citus_job_cancel(:job_id2);
SELECT citus_job_cancel(:job_id3);
SELECT citus_job_wait(:job_id1);
SELECT citus_job_wait(:job_id2);
SELECT citus_job_wait(:job_id3);
SELECT task_id, status FROM pg_dist_background_task
WHERE task_id IN (:task_id1, :task_id2, :task_id3, :task_id4, :task_id5)
ORDER BY task_id; -- show that all tasks are cancelled
-- verify that upon termination signal, all tasks fail and retry policy sets their status back to runnable
BEGIN;
INSERT INTO pg_dist_background_job (job_type, description) VALUES ('test_job', 'simple test to verify termination on monitor') RETURNING job_id AS job_id1 \gset
INSERT INTO pg_dist_background_task (job_id, command) VALUES (:job_id1, $job$ SELECT pg_sleep(500); $job$) RETURNING task_id AS task_id1 \gset
INSERT INTO pg_dist_background_job (job_type, description) VALUES ('test_job', 'simple test to verify termination on monitor') RETURNING job_id AS job_id2 \gset
INSERT INTO pg_dist_background_task (job_id, command) VALUES (:job_id2, $job$ SELECT pg_sleep(500); $job$) RETURNING task_id AS task_id2 \gset
COMMIT;
SELECT citus_job_wait(:job_id1, desired_status => 'running');
SELECT citus_job_wait(:job_id2, desired_status => 'running');
SELECT task_id, status FROM pg_dist_background_task
WHERE task_id IN (:task_id1, :task_id2)
ORDER BY task_id;
SELECT pid AS monitor_pid FROM pg_stat_activity WHERE application_name ~ 'task queue monitor' \gset
SELECT pg_terminate_backend(:monitor_pid); -- terminate monitor process
SELECT pg_sleep(2); -- wait enough to show that tasks are terminated
SELECT task_id, status, retry_count, message FROM pg_dist_background_task
WHERE task_id IN (:task_id1, :task_id2)
ORDER BY task_id; -- show that all tasks are runnable by retry policy after termination signal
SELECT citus_job_cancel(:job_id1);
SELECT citus_job_cancel(:job_id2);
SELECT citus_job_wait(:job_id1);
SELECT citus_job_wait(:job_id2);
SELECT task_id, status FROM pg_dist_background_task
WHERE task_id IN (:task_id1, :task_id2)
ORDER BY task_id; -- show that all tasks are cancelled
-- verify that upon cancellation signal, all tasks are cancelled
BEGIN;
INSERT INTO pg_dist_background_job (job_type, description) VALUES ('test_job', 'simple test to verify cancellation on monitor') RETURNING job_id AS job_id1 \gset
INSERT INTO pg_dist_background_task (job_id, command) VALUES (:job_id1, $job$ SELECT pg_sleep(500); $job$) RETURNING task_id AS task_id1 \gset
INSERT INTO pg_dist_background_job (job_type, description) VALUES ('test_job', 'simple test to verify cancellation on monitor') RETURNING job_id AS job_id2 \gset
INSERT INTO pg_dist_background_task (job_id, command) VALUES (:job_id2, $job$ SELECT pg_sleep(500); $job$) RETURNING task_id AS task_id2 \gset
COMMIT;
SELECT citus_job_wait(:job_id1, desired_status => 'running');
SELECT citus_job_wait(:job_id2, desired_status => 'running');
SELECT task_id, status FROM pg_dist_background_task
WHERE task_id IN (:task_id1, :task_id2)
ORDER BY task_id;
SELECT pid AS monitor_pid FROM pg_stat_activity WHERE application_name ~ 'task queue monitor' \gset
SELECT pg_cancel_backend(:monitor_pid); -- cancel monitor process
SELECT pg_sleep(2); -- wait enough to show that tasks are cancelled
SELECT citus_job_wait(:job_id1);
SELECT citus_job_wait(:job_id2);
SELECT task_id, status FROM pg_dist_background_task
WHERE task_id IN (:task_id1, :task_id2)
ORDER BY task_id; -- show that all tasks are cancelled
-- verify that task is not starved by currently long running task
BEGIN;
INSERT INTO pg_dist_background_job (job_type, description) VALUES ('test_job', 'simple test to verify task execution starvation') RETURNING job_id AS job_id1 \gset
INSERT INTO pg_dist_background_task (job_id, command) VALUES (:job_id1, $job$ SELECT pg_sleep(5000); $job$) RETURNING task_id AS task_id1 \gset
INSERT INTO pg_dist_background_job (job_type, description) VALUES ('test_job', 'simple test to verify task execution starvation') RETURNING job_id AS job_id2 \gset
INSERT INTO pg_dist_background_task (job_id, command) VALUES (:job_id2, $job$ SELECT 1; $job$) RETURNING task_id AS task_id2 \gset INSERT INTO pg_dist_background_task (job_id, command) VALUES (:job_id2, $job$ SELECT 1; $job$) RETURNING task_id AS task_id2 \gset
COMMIT; COMMIT;
@ -139,6 +238,11 @@ SELECT job_id, task_id, status FROM pg_dist_background_task
WHERE task_id IN (:task_id1, :task_id2) WHERE task_id IN (:task_id1, :task_id2)
ORDER BY job_id, task_id; -- show that last task is finished without starvation ORDER BY job_id, task_id; -- show that last task is finished without starvation
SELECT citus_job_cancel(:job_id1); SELECT citus_job_cancel(:job_id1);
SELECT citus_job_wait(:job_id1);
SELECT job_id, task_id, status FROM pg_dist_background_task
WHERE task_id IN (:task_id1, :task_id2)
ORDER BY job_id, task_id; -- show that task is cancelled
SET client_min_messages TO WARNING; SET client_min_messages TO WARNING;
DROP SCHEMA background_task_queue_monitor CASCADE; DROP SCHEMA background_task_queue_monitor CASCADE;

View File

@ -244,7 +244,9 @@ CALL pg_catalog.citus_cleanup_orphaned_resources();
-- END: Split a partition table directly -- END: Split a partition table directly
-- BEGIN: Perform deferred cleanup. -- BEGIN: Perform deferred cleanup.
SET client_min_messages TO WARNING;
CALL pg_catalog.citus_cleanup_orphaned_resources(); CALL pg_catalog.citus_cleanup_orphaned_resources();
RESET client_min_messages;
-- END: Perform deferred cleanup. -- END: Perform deferred cleanup.
-- BEGIN: Validate Shard Info and Data -- BEGIN: Validate Shard Info and Data

View File

@ -46,6 +46,11 @@ SELECT pg_catalog.citus_split_shard_by_split_points(
ARRAY[:worker_2_node, :worker_2_node], ARRAY[:worker_2_node, :worker_2_node],
'force_logical'); 'force_logical');
-- BEGIN: Perform deferred cleanup.
SET client_min_messages TO WARNING;
CALL pg_catalog.citus_cleanup_orphaned_resources();
RESET client_min_messages;
-- END: Perform deferred cleanup.
\c - - - :worker_1_port \c - - - :worker_1_port
SET search_path TO "citus_split_test_schema"; SET search_path TO "citus_split_test_schema";

View File

@ -149,7 +149,9 @@ SELECT pg_catalog.citus_split_shard_by_split_points(
'force_logical'); 'force_logical');
-- BEGIN: Perform deferred cleanup. -- BEGIN: Perform deferred cleanup.
SET client_min_messages TO WARNING;
CALL pg_catalog.citus_cleanup_orphaned_resources(); CALL pg_catalog.citus_cleanup_orphaned_resources();
RESET client_min_messages;
-- END: Perform deferred cleanup. -- END: Perform deferred cleanup.
-- Perform 3 way split -- Perform 3 way split
@ -161,7 +163,9 @@ SELECT pg_catalog.citus_split_shard_by_split_points(
-- END : Split two shards : One with move and One without move. -- END : Split two shards : One with move and One without move.
-- BEGIN: Perform deferred cleanup. -- BEGIN: Perform deferred cleanup.
SET client_min_messages TO WARNING;
CALL pg_catalog.citus_cleanup_orphaned_resources(); CALL pg_catalog.citus_cleanup_orphaned_resources();
RESET client_min_messages;
-- END: Perform deferred cleanup. -- END: Perform deferred cleanup.
-- BEGIN : Move a shard post split. -- BEGIN : Move a shard post split.
@ -263,7 +267,9 @@ SELECT pg_catalog.citus_split_shard_by_split_points(
ARRAY[:worker_1_node, :worker_2_node]); ARRAY[:worker_1_node, :worker_2_node]);
-- BEGIN: Perform deferred cleanup. -- BEGIN: Perform deferred cleanup.
SET client_min_messages TO WARNING;
CALL pg_catalog.citus_cleanup_orphaned_resources(); CALL pg_catalog.citus_cleanup_orphaned_resources();
RESET client_min_messages;
-- END: Perform deferred cleanup. -- END: Perform deferred cleanup.
SELECT shard.shardid, logicalrelid, shardminvalue, shardmaxvalue, nodename, nodeport SELECT shard.shardid, logicalrelid, shardminvalue, shardmaxvalue, nodename, nodeport
@ -288,7 +294,9 @@ SELECT pg_catalog.citus_split_shard_by_split_points(
'auto'); 'auto');
-- BEGIN: Perform deferred cleanup. -- BEGIN: Perform deferred cleanup.
SET client_min_messages TO WARNING;
CALL pg_catalog.citus_cleanup_orphaned_resources(); CALL pg_catalog.citus_cleanup_orphaned_resources();
RESET client_min_messages;
-- END: Perform deferred cleanup. -- END: Perform deferred cleanup.
SELECT shard.shardid, logicalrelid, shardminvalue, shardmaxvalue, nodename, nodeport SELECT shard.shardid, logicalrelid, shardminvalue, shardmaxvalue, nodename, nodeport
@ -308,6 +316,11 @@ SELECT COUNT(*) FROM colocated_dist_table;
--BEGIN : Cleanup --BEGIN : Cleanup
\c - postgres - :master_port \c - postgres - :master_port
-- make sure we don't have any replication objects leftover on the workers
SELECT run_command_on_workers($$SELECT count(*) FROM pg_replication_slots$$);
SELECT run_command_on_workers($$SELECT count(*) FROM pg_publication$$);
SELECT run_command_on_workers($$SELECT count(*) FROM pg_subscription$$);
ALTER SYSTEM RESET citus.defer_shard_delete_interval; ALTER SYSTEM RESET citus.defer_shard_delete_interval;
SELECT pg_reload_conf(); SELECT pg_reload_conf();
DROP SCHEMA "citus_split_test_schema" CASCADE; DROP SCHEMA "citus_split_test_schema" CASCADE;

View File

@ -134,7 +134,9 @@ SELECT citus.mitmproxy('conn.allow()');
SELECT run_command_on_workers($$DROP SUBSCRIPTION IF EXISTS citus_shard_move_subscription_10$$); SELECT run_command_on_workers($$DROP SUBSCRIPTION IF EXISTS citus_shard_move_subscription_10$$);
-- cleanup leftovers -- cleanup leftovers
-- verify we don't see any error for already dropped subscription -- verify we don't see any error for already dropped subscription
SET client_min_messages TO WARNING;
CALL citus_cleanup_orphaned_resources(); CALL citus_cleanup_orphaned_resources();
RESET client_min_messages;
-- cancellation on dropping subscription -- cancellation on dropping subscription
SELECT citus.mitmproxy('conn.onQuery(query="^DROP SUBSCRIPTION").cancel(' || :pid || ')'); SELECT citus.mitmproxy('conn.onQuery(query="^DROP SUBSCRIPTION").cancel(' || :pid || ')');

View File

@ -40,7 +40,7 @@ SELECT create_distributed_table('table_to_split', 'id');
ARRAY[:worker_1_node, :worker_2_node], ARRAY[:worker_1_node, :worker_2_node],
'force_logical'); 'force_logical');
SELECT operation_id, object_type, object_name, node_group_id, policy_type SELECT operation_id, object_type, object_name, node_group_id, policy_type
FROM pg_dist_cleanup where operation_id = 777; FROM pg_dist_cleanup where operation_id = 777 ORDER BY object_name;
-- we need to allow connection so that we can connect to proxy -- we need to allow connection so that we can connect to proxy
SELECT citus.mitmproxy('conn.allow()'); SELECT citus.mitmproxy('conn.allow()');
@ -60,7 +60,7 @@ SELECT create_distributed_table('table_to_split', 'id');
\c - postgres - :master_port \c - postgres - :master_port
CALL pg_catalog.citus_cleanup_orphaned_resources(); CALL pg_catalog.citus_cleanup_orphaned_resources();
SELECT operation_id, object_type, object_name, node_group_id, policy_type SELECT operation_id, object_type, object_name, node_group_id, policy_type
FROM pg_dist_cleanup where operation_id = 777; FROM pg_dist_cleanup where operation_id = 777 ORDER BY object_name;
\c - - - :worker_2_proxy_port \c - - - :worker_2_proxy_port
SET search_path TO "citus_failure_split_cleanup_schema", public, pg_catalog; SET search_path TO "citus_failure_split_cleanup_schema", public, pg_catalog;
@ -92,7 +92,7 @@ SELECT create_distributed_table('table_to_split', 'id');
RESET client_min_messages; RESET client_min_messages;
SELECT operation_id, object_type, object_name, node_group_id, policy_type SELECT operation_id, object_type, object_name, node_group_id, policy_type
FROM pg_dist_cleanup where operation_id = 777; FROM pg_dist_cleanup where operation_id = 777 ORDER BY object_name;
-- we need to allow connection so that we can connect to proxy -- we need to allow connection so that we can connect to proxy
SELECT citus.mitmproxy('conn.allow()'); SELECT citus.mitmproxy('conn.allow()');
@ -111,7 +111,7 @@ SELECT create_distributed_table('table_to_split', 'id');
\c - postgres - :master_port \c - postgres - :master_port
CALL pg_catalog.citus_cleanup_orphaned_resources(); CALL pg_catalog.citus_cleanup_orphaned_resources();
SELECT operation_id, object_type, object_name, node_group_id, policy_type SELECT operation_id, object_type, object_name, node_group_id, policy_type
FROM pg_dist_cleanup where operation_id = 777; FROM pg_dist_cleanup where operation_id = 777 ORDER BY object_name;
\c - - - :worker_2_proxy_port \c - - - :worker_2_proxy_port
SET search_path TO "citus_failure_split_cleanup_schema", public, pg_catalog; SET search_path TO "citus_failure_split_cleanup_schema", public, pg_catalog;
@ -138,7 +138,7 @@ SELECT create_distributed_table('table_to_split', 'id');
ARRAY[:worker_1_node, :worker_2_node], ARRAY[:worker_1_node, :worker_2_node],
'force_logical'); 'force_logical');
SELECT operation_id, object_type, object_name, node_group_id, policy_type SELECT operation_id, object_type, object_name, node_group_id, policy_type
FROM pg_dist_cleanup where operation_id = 777; FROM pg_dist_cleanup where operation_id = 777 ORDER BY object_name;
-- we need to allow connection so that we can connect to proxy -- we need to allow connection so that we can connect to proxy
SELECT citus.mitmproxy('conn.allow()'); SELECT citus.mitmproxy('conn.allow()');
@ -157,7 +157,7 @@ SELECT create_distributed_table('table_to_split', 'id');
\c - postgres - :master_port \c - postgres - :master_port
CALL pg_catalog.citus_cleanup_orphaned_resources(); CALL pg_catalog.citus_cleanup_orphaned_resources();
SELECT operation_id, object_type, object_name, node_group_id, policy_type SELECT operation_id, object_type, object_name, node_group_id, policy_type
FROM pg_dist_cleanup where operation_id = 777; FROM pg_dist_cleanup where operation_id = 777 ORDER BY object_name;
\c - - - :worker_2_proxy_port \c - - - :worker_2_proxy_port
SET search_path TO "citus_failure_split_cleanup_schema", public, pg_catalog; SET search_path TO "citus_failure_split_cleanup_schema", public, pg_catalog;
@ -184,7 +184,7 @@ SELECT create_distributed_table('table_to_split', 'id');
ARRAY[:worker_1_node, :worker_2_node], ARRAY[:worker_1_node, :worker_2_node],
'force_logical'); 'force_logical');
SELECT operation_id, object_type, object_name, node_group_id, policy_type SELECT operation_id, object_type, object_name, node_group_id, policy_type
FROM pg_dist_cleanup where operation_id = 777; FROM pg_dist_cleanup where operation_id = 777 ORDER BY object_name;
-- we need to allow connection so that we can connect to proxy -- we need to allow connection so that we can connect to proxy
SELECT citus.mitmproxy('conn.allow()'); SELECT citus.mitmproxy('conn.allow()');
@ -203,7 +203,7 @@ SELECT create_distributed_table('table_to_split', 'id');
\c - postgres - :master_port \c - postgres - :master_port
CALL pg_catalog.citus_cleanup_orphaned_resources(); CALL pg_catalog.citus_cleanup_orphaned_resources();
SELECT operation_id, object_type, object_name, node_group_id, policy_type SELECT operation_id, object_type, object_name, node_group_id, policy_type
FROM pg_dist_cleanup where operation_id = 777; FROM pg_dist_cleanup where operation_id = 777 ORDER BY object_name;
\c - - - :worker_2_proxy_port \c - - - :worker_2_proxy_port
SET search_path TO "citus_failure_split_cleanup_schema", public, pg_catalog; SET search_path TO "citus_failure_split_cleanup_schema", public, pg_catalog;
@ -230,7 +230,7 @@ SELECT create_distributed_table('table_to_split', 'id');
ARRAY[:worker_1_node, :worker_2_node], ARRAY[:worker_1_node, :worker_2_node],
'force_logical'); 'force_logical');
SELECT operation_id, object_type, object_name, node_group_id, policy_type SELECT operation_id, object_type, object_name, node_group_id, policy_type
FROM pg_dist_cleanup where operation_id = 777; FROM pg_dist_cleanup where operation_id = 777 ORDER BY object_name;
-- we need to allow connection so that we can connect to proxy -- we need to allow connection so that we can connect to proxy
SELECT citus.mitmproxy('conn.allow()'); SELECT citus.mitmproxy('conn.allow()');
@ -249,7 +249,7 @@ SELECT create_distributed_table('table_to_split', 'id');
\c - postgres - :master_port \c - postgres - :master_port
CALL pg_catalog.citus_cleanup_orphaned_resources(); CALL pg_catalog.citus_cleanup_orphaned_resources();
SELECT operation_id, object_type, object_name, node_group_id, policy_type SELECT operation_id, object_type, object_name, node_group_id, policy_type
FROM pg_dist_cleanup where operation_id = 777; FROM pg_dist_cleanup where operation_id = 777 ORDER BY object_name;
\c - - - :worker_2_proxy_port \c - - - :worker_2_proxy_port
SET search_path TO "citus_failure_split_cleanup_schema", public, pg_catalog; SET search_path TO "citus_failure_split_cleanup_schema", public, pg_catalog;
@ -277,7 +277,7 @@ SELECT create_distributed_table('table_to_split', 'id');
'force_logical'); 'force_logical');
SELECT operation_id, object_type, object_name, node_group_id, policy_type SELECT operation_id, object_type, object_name, node_group_id, policy_type
FROM pg_dist_cleanup where operation_id = 777; FROM pg_dist_cleanup where operation_id = 777 ORDER BY object_name;
SELECT relname FROM pg_class where relname LIKE '%table_to_split_%' AND relkind = 'r' order by relname; SELECT relname FROM pg_class where relname LIKE '%table_to_split_%' AND relkind = 'r' order by relname;
-- we need to allow connection so that we can connect to proxy -- we need to allow connection so that we can connect to proxy
SELECT citus.mitmproxy('conn.allow()'); SELECT citus.mitmproxy('conn.allow()');
@ -297,7 +297,7 @@ SELECT create_distributed_table('table_to_split', 'id');
\c - postgres - :master_port \c - postgres - :master_port
CALL pg_catalog.citus_cleanup_orphaned_resources(); CALL pg_catalog.citus_cleanup_orphaned_resources();
SELECT operation_id, object_type, object_name, node_group_id, policy_type SELECT operation_id, object_type, object_name, node_group_id, policy_type
FROM pg_dist_cleanup where operation_id = 777; FROM pg_dist_cleanup where operation_id = 777 ORDER BY object_name;
\c - - - :worker_2_proxy_port \c - - - :worker_2_proxy_port
SET search_path TO "citus_failure_split_cleanup_schema", public, pg_catalog; SET search_path TO "citus_failure_split_cleanup_schema", public, pg_catalog;

View File

@ -0,0 +1,50 @@
--
-- ISSUE_5763
--
-- Issue: DROP OWNED BY fails to drop the schemas on the workers
-- Link: https://github.com/citusdata/citus/issues/5763
--
CREATE USER issue_5763_1 WITH SUPERUSER;
CREATE USER issue_5763_2 WITH SUPERUSER;
\c - issue_5763_1 - :master_port
CREATE SCHEMA issue_5763_sc_1;
\c - issue_5763_2 - :master_port
CREATE SCHEMA issue_5763_sc_2;
\c - postgres - :master_port
DROP OWNED BY issue_5763_1, issue_5763_2;
\c - issue_5763_1 - :master_port
CREATE SCHEMA issue_5763_sc_1;
\c - postgres - :master_port
DROP SCHEMA issue_5763_sc_1;
DROP USER issue_5763_1, issue_5763_2;
-- test CASCADE options
CREATE USER issue_5763_3 WITH SUPERUSER;
\c - issue_5763_3 - :master_port
CREATE SCHEMA issue_5763_sc_3;
CREATE TABLE issue_5763_sc_3.tb1(id int);
\c - postgres - :master_port
DROP OWNED BY issue_5763_3 CASCADE;
DROP USER issue_5763_3;
-- test non-distributed role
SET citus.enable_create_role_propagation TO off;
CREATE USER issue_5763_4 WITH SUPERUSER;
\c - issue_5763_4 - :master_port
set citus.enable_ddl_propagation = off;
CREATE SCHEMA issue_5763_sc_4;
\c - postgres - :master_port
DROP OWNED BY issue_5763_4 RESTRICT;
DROP USER issue_5763_4;

View File

@ -328,9 +328,6 @@ DROP TABLE
test, test,
test_coloc, test_coloc,
colocation_table; colocation_table;
SELECT run_command_on_workers($$DROP OWNED BY full_access$$);
SELECT run_command_on_workers($$DROP OWNED BY some_role$$);
SELECT run_command_on_workers($$DROP OWNED BY read_access$$);
DROP USER full_access; DROP USER full_access;
DROP USER read_access; DROP USER read_access;
DROP USER no_access; DROP USER no_access;

View File

@ -842,7 +842,6 @@ SET citus.next_shard_id TO 1197000;
-- we do not use run_command_on_coordinator_and_workers here because when there is CASCADE, it causes deadlock -- we do not use run_command_on_coordinator_and_workers here because when there is CASCADE, it causes deadlock
DROP OWNED BY "test-user" CASCADE; DROP OWNED BY "test-user" CASCADE;
SELECT run_command_on_workers('DROP OWNED BY "test-user" CASCADE');
DROP USER "test-user"; DROP USER "test-user";
DROP FUNCTION run_command_on_coordinator_and_workers(p_sql text); DROP FUNCTION run_command_on_coordinator_and_workers(p_sql text);

View File

@ -924,6 +924,21 @@ DROP SERVER foreign_server CASCADE;
CREATE DATABASE db_with_oid OID 987654; CREATE DATABASE db_with_oid OID 987654;
DROP DATABASE db_with_oid; DROP DATABASE db_with_oid;
-- SET ACCESS METHOD
-- Create a heap2 table am handler with heapam handler
CREATE ACCESS METHOD heap2 TYPE TABLE HANDLER heap_tableam_handler;
SELECT run_command_on_workers($$CREATE ACCESS METHOD heap2 TYPE TABLE HANDLER heap_tableam_handler$$);
CREATE TABLE mx_ddl_table2 (
key int primary key,
value int
);
SELECT create_distributed_table('mx_ddl_table2', 'key', 'hash', shard_count=> 4);
ALTER TABLE mx_ddl_table2 SET ACCESS METHOD heap2;
DROP TABLE mx_ddl_table2;
DROP ACCESS METHOD heap2;
SELECT run_command_on_workers($$DROP ACCESS METHOD heap2$$);
-- Clean up -- Clean up
\set VERBOSITY terse \set VERBOSITY terse
SET client_min_messages TO ERROR; SET client_min_messages TO ERROR;

View File

@ -313,8 +313,6 @@ DROP SCHEMA single_node_ent CASCADE;
DROP OWNED BY full_access_single_node; DROP OWNED BY full_access_single_node;
DROP OWNED BY read_access_single_node; DROP OWNED BY read_access_single_node;
SELECT run_command_on_workers($$DROP OWNED BY full_access_single_node$$);
SELECT run_command_on_workers($$DROP OWNED BY read_access_single_node$$);
DROP ROLE full_access_single_node; DROP ROLE full_access_single_node;
DROP ROLE read_access_single_node; DROP ROLE read_access_single_node;