diff --git a/.devcontainer/.gdbinit b/.devcontainer/.gdbinit index 9c710923f..9d544512b 100644 --- a/.devcontainer/.gdbinit +++ b/.devcontainer/.gdbinit @@ -3,3 +3,31 @@ # actually also works when debugging with vscode. Providing nice tools # to understand the internal datastructures we are working with. source /root/gdbpg.py + +# when debugging postgres it is convenient to _always_ have a breakpoint +# trigger when an error is logged. Because .gdbinit is sourced before gdb +# is fully attached and has the sources loaded. To make sure the breakpoint +# is added when the library is loaded we temporary set the breakpoint pending +# to on. After we have added out breakpoint we revert back to the default +# configuration for breakpoint pending. +# The breakpoint is hard to read, but at entry of the function we don't have +# the level loaded in elevel. Instead we hardcode the location where the +# level of the current error is stored. Also gdb doesn't understand the +# ERROR symbol so we hardcode this to the value of ERROR. It is very unlikely +# this value will ever change in postgres, but if it does we might need to +# find a way to conditionally load the correct breakpoint. +set breakpoint pending on +break elog.c:errfinish if errordata[errordata_stack_depth].elevel == 21 +set breakpoint pending auto + +echo \n +echo ----------------------------------------------------------------------------------\n +echo when attaching to a postgres backend a breakpoint will be set on elog.c:errfinish \n +echo it will only break on errors being raised in postgres \n +echo \n +echo to disable this breakpoint from vscode run `-exec disable 1` in the debug console \n +echo this assumes it's the first breakpoint loaded as it is loaded from .gdbinit \n +echo this can be verified with `-exec info break`, enabling can be done with \n +echo `-exec enable 1` \n +echo ----------------------------------------------------------------------------------\n +echo \n diff --git a/.github/workflows/build_and_test.yml b/.github/workflows/build_and_test.yml index 5944c38db..1f22ff034 100644 --- a/.github/workflows/build_and_test.yml +++ b/.github/workflows/build_and_test.yml @@ -13,10 +13,33 @@ on: pull_request: types: [opened, reopened,synchronize] jobs: + # Since GHA does not interpolate env varibles in matrix context, we need to + # define them in a separate job and use them in other jobs. + params: + runs-on: ubuntu-latest + name: Initialize parameters + outputs: + build_image_name: "citus/extbuilder" + test_image_name: "citus/exttester" + citusupgrade_image_name: "citus/citusupgradetester" + fail_test_image_name: "citus/failtester" + pgupgrade_image_name: "citus/pgupgradetester" + style_checker_image_name: "citus/stylechecker" + style_checker_tools_version: "0.8.18" + image_suffix: "-v9d71045" + pg14_version: "14.9" + pg15_version: "15.4" + pg16_version: "16.0" + upgrade_pg_versions: "14.9-15.4-16.0" + steps: + # Since GHA jobs needs at least one step we use a noop step here. + - name: Set up parameters + run: echo 'noop' check-sql-snapshots: + needs: params runs-on: ubuntu-20.04 container: - image: ${{ vars.build_image_name }}:latest + image: ${{ needs.params.outputs.build_image_name }}:latest options: --user root steps: - uses: actions/checkout@v3.5.0 @@ -25,9 +48,10 @@ jobs: git config --global --add safe.directory ${GITHUB_WORKSPACE} ci/check_sql_snapshots.sh check-style: + needs: params runs-on: ubuntu-20.04 container: - image: ${{ vars.style_checker_image_name }}:${{ vars.style_checker_tools_version }}${{ vars.image_suffix }} + image: ${{ needs.params.outputs.style_checker_image_name }}:${{ needs.params.outputs.style_checker_tools_version }}${{ needs.params.outputs.image_suffix }} steps: - name: Check Snapshots run: | @@ -68,18 +92,19 @@ jobs: - name: Check for missing downgrade scripts run: ci/check_migration_files.sh build: + needs: params name: Build for PG ${{ matrix.pg_version}} strategy: fail-fast: false matrix: image_name: - - ${{ vars.build_image_name }} + - ${{ needs.params.outputs.build_image_name }} image_suffix: - - ${{ vars.image_suffix}} + - ${{ needs.params.outputs.image_suffix}} pg_version: - - ${{ vars.pg14_version }} - - ${{ vars.pg15_version }} - - ${{ vars.pg16_version }} + - ${{ needs.params.outputs.pg14_version }} + - ${{ needs.params.outputs.pg15_version }} + - ${{ needs.params.outputs.pg16_version }} runs-on: ubuntu-20.04 container: image: "${{ matrix.image_name }}:${{ matrix.pg_version }}${{ matrix.image_suffix }}" @@ -106,11 +131,11 @@ jobs: suite: - regress image_name: - - ${{ vars.test_image_name }} + - ${{ needs.params.outputs.test_image_name }} pg_version: - - ${{ vars.pg14_version }} - - ${{ vars.pg15_version }} - - ${{ vars.pg16_version }} + - ${{ needs.params.outputs.pg14_version }} + - ${{ needs.params.outputs.pg15_version }} + - ${{ needs.params.outputs.pg16_version }} make: - check-split - check-multi @@ -129,69 +154,70 @@ jobs: - check-enterprise-isolation-logicalrep-3 include: - make: check-failure - pg_version: ${{ vars.pg14_version }} + pg_version: ${{ needs.params.outputs.pg14_version }} suite: regress - image_name: ${{ vars.fail_test_image_name }} + image_name: ${{ needs.params.outputs.fail_test_image_name }} - make: check-failure - pg_version: ${{ vars.pg15_version }} + pg_version: ${{ needs.params.outputs.pg15_version }} suite: regress - image_name: ${{ vars.fail_test_image_name }} + image_name: ${{ needs.params.outputs.fail_test_image_name }} - make: check-failure - pg_version: ${{ vars.pg16_version }} + pg_version: ${{ needs.params.outputs.pg16_version }} suite: regress - image_name: ${{ vars.fail_test_image_name }} + image_name: ${{ needs.params.outputs.fail_test_image_name }} - make: check-enterprise-failure - pg_version: ${{ vars.pg14_version }} + pg_version: ${{ needs.params.outputs.pg14_version }} suite: regress - image_name: ${{ vars.fail_test_image_name }} + image_name: ${{ needs.params.outputs.fail_test_image_name }} - make: check-enterprise-failure - pg_version: ${{ vars.pg15_version }} + pg_version: ${{ needs.params.outputs.pg15_version }} suite: regress - image_name: ${{ vars.fail_test_image_name }} + image_name: ${{ needs.params.outputs.fail_test_image_name }} - make: check-enterprise-failure - pg_version: ${{ vars.pg16_version }} + pg_version: ${{ needs.params.outputs.pg16_version }} suite: regress - image_name: ${{ vars.fail_test_image_name }} + image_name: ${{ needs.params.outputs.fail_test_image_name }} - make: check-pytest - pg_version: ${{ vars.pg14_version }} + pg_version: ${{ needs.params.outputs.pg14_version }} suite: regress - image_name: ${{ vars.fail_test_image_name }} + image_name: ${{ needs.params.outputs.fail_test_image_name }} - make: check-pytest - pg_version: ${{ vars.pg15_version }} + pg_version: ${{ needs.params.outputs.pg15_version }} suite: regress - image_name: ${{ vars.fail_test_image_name }} + image_name: ${{ needs.params.outputs.fail_test_image_name }} - make: check-pytest - pg_version: ${{ vars.pg16_version }} + pg_version: ${{ needs.params.outputs.pg16_version }} suite: regress - image_name: ${{ vars.fail_test_image_name }} + image_name: ${{ needs.params.outputs.fail_test_image_name }} - make: installcheck suite: cdc - image_name: ${{ vars.test_image_name }} - pg_version: ${{ vars.pg15_version }} + image_name: ${{ needs.params.outputs.test_image_name }} + pg_version: ${{ needs.params.outputs.pg15_version }} - make: installcheck suite: cdc - image_name: ${{ vars.test_image_name }} - pg_version: ${{ vars.pg16_version }} + image_name: ${{ needs.params.outputs.test_image_name }} + pg_version: ${{ needs.params.outputs.pg16_version }} - make: check-query-generator - pg_version: ${{ vars.pg14_version }} + pg_version: ${{ needs.params.outputs.pg14_version }} suite: regress - image_name: ${{ vars.fail_test_image_name }} + image_name: ${{ needs.params.outputs.fail_test_image_name }} - make: check-query-generator - pg_version: ${{ vars.pg15_version }} + pg_version: ${{ needs.params.outputs.pg15_version }} suite: regress - image_name: ${{ vars.fail_test_image_name }} + image_name: ${{ needs.params.outputs.fail_test_image_name }} - make: check-query-generator - pg_version: ${{ vars.pg16_version }} + pg_version: ${{ needs.params.outputs.pg16_version }} suite: regress - image_name: ${{ vars.fail_test_image_name }} + image_name: ${{ needs.params.outputs.fail_test_image_name }} runs-on: ubuntu-20.04 container: - image: "${{ matrix.image_name }}:${{ matrix.pg_version }}${{ vars.image_suffix }}" + image: "${{ matrix.image_name }}:${{ matrix.pg_version }}${{ needs.params.outputs.image_suffix }}" options: --user root --dns=8.8.8.8 # Due to Github creates a default network for each job, we need to use # --dns= to have similar DNS settings as our other CI systems or local # machines. Otherwise, we may see different results. needs: + - params - build steps: - uses: actions/checkout@v3.5.0 @@ -212,19 +238,20 @@ jobs: name: PG${{ matrix.pg_version }} - check-arbitrary-configs-${{ matrix.parallel }} runs-on: ["self-hosted", "1ES.Pool=1es-gha-citusdata-pool"] container: - image: "${{ matrix.image_name }}:${{ matrix.pg_version }}${{ vars.image_suffix }}" + image: "${{ matrix.image_name }}:${{ matrix.pg_version }}${{ needs.params.outputs.image_suffix }}" options: --user root needs: + - params - build strategy: fail-fast: false matrix: image_name: - - ${{ vars.fail_test_image_name }} + - ${{ needs.params.outputs.fail_test_image_name }} pg_version: - - ${{ vars.pg14_version }} - - ${{ vars.pg15_version }} - - ${{ vars.pg16_version }} + - ${{ needs.params.outputs.pg14_version }} + - ${{ needs.params.outputs.pg15_version }} + - ${{ needs.params.outputs.pg16_version }} parallel: [0,1,2,3,4,5] # workaround for running 6 parallel jobs steps: - uses: actions/checkout@v3.5.0 @@ -258,9 +285,10 @@ jobs: name: PG${{ matrix.old_pg_major }}-PG${{ matrix.new_pg_major }} - check-pg-upgrade runs-on: ubuntu-20.04 container: - image: "${{ vars.pgupgrade_image_name }}:${{ vars.upgrade_pg_versions }}${{ vars.image_suffix }}" + image: "${{ needs.params.outputs.pgupgrade_image_name }}:${{ needs.params.outputs.upgrade_pg_versions }}${{ needs.params.outputs.image_suffix }}" options: --user root needs: + - params - build strategy: fail-fast: false @@ -305,12 +333,13 @@ jobs: flags: ${{ env.old_pg_major }}_${{ env.new_pg_major }}_upgrade codecov_token: ${{ secrets.CODECOV_TOKEN }} test-citus-upgrade: - name: PG${{ vars.pg14_version }} - check-citus-upgrade + name: PG${{ needs.params.outputs.pg14_version }} - check-citus-upgrade runs-on: ubuntu-20.04 container: - image: "${{ vars.citusupgrade_image_name }}:${{ vars.pg14_version }}${{ vars.image_suffix }}" + image: "${{ needs.params.outputs.citusupgrade_image_name }}:${{ needs.params.outputs.pg14_version }}${{ needs.params.outputs.image_suffix }}" options: --user root needs: + - params - build steps: - uses: actions/checkout@v3.5.0 @@ -354,8 +383,9 @@ jobs: CC_TEST_REPORTER_ID: ${{ secrets.CC_TEST_REPORTER_ID }} runs-on: ubuntu-20.04 container: - image: ${{ vars.test_image_name }}:${{ vars.pg16_version }}${{ vars.image_suffix }} + image: ${{ needs.params.outputs.test_image_name }}:${{ needs.params.outputs.pg16_version }}${{ needs.params.outputs.image_suffix }} needs: + - params - test-citus - test-arbitrary-configs - test-citus-upgrade @@ -448,11 +478,12 @@ jobs: name: Test flakyness runs-on: ubuntu-20.04 container: - image: ${{ vars.fail_test_image_name }}:${{ vars.pg16_version }}${{ vars.image_suffix }} + image: ${{ needs.params.outputs.fail_test_image_name }}:${{ needs.params.outputs.pg16_version }}${{ needs.params.outputs.image_suffix }} options: --user root env: runs: 8 needs: + - params - build - test-flakyness-pre - prepare_parallelization_matrix_32 diff --git a/.github/workflows/packaging-test-pipelines.yml b/.github/workflows/packaging-test-pipelines.yml index 9d3fb81be..0fb4b7092 100644 --- a/.github/workflows/packaging-test-pipelines.yml +++ b/.github/workflows/packaging-test-pipelines.yml @@ -24,14 +24,14 @@ jobs: - name: Get Postgres Versions id: get-postgres-versions run: | - # Postgres versions are stored in .circleci/config.yml file in "build-[pg-version] format. Below command - # extracts the versions and get the unique values. - pg_versions=`grep -Eo 'build-[[:digit:]]{2}' .circleci/config.yml|sed -e "s/^build-//"|sort|uniq|tr '\n' ','| head -c -1` + # Postgres versions are stored in .github/workflows/build_and_test.yml file in "pg[pg-version]_version" + # format. Below command extracts the versions and get the unique values. + pg_versions=$(cat .github/workflows/build_and_test.yml | grep -oE 'pg[0-9]+_version: "[0-9.]+"' | sed -E 's/pg([0-9]+)_version: "([0-9.]+)"/\1/g' | sort | uniq | tr '\n', ',') pg_versions_array="[ ${pg_versions} ]" echo "Supported PG Versions: ${pg_versions_array}" # Below line is needed to set the output variable to be used in the next job echo "pg_versions=${pg_versions_array}" >> $GITHUB_OUTPUT - + shell: bash rpm_build_tests: name: rpm_build_tests needs: get_postgres_versions_from_file diff --git a/src/backend/distributed/README.md b/src/backend/distributed/README.md index 0a3164e0f..7c4f43add 100644 --- a/src/backend/distributed/README.md +++ b/src/backend/distributed/README.md @@ -245,6 +245,7 @@ CREATE TABLE country_codes ( country_code VARCHAR(3) PRIMARY KEY, country_name VARCHAR(50) ); +SELECT create_reference_table('country_codes'); -- Reference Table: Order Status CREATE TABLE order_status ( @@ -269,14 +270,17 @@ The aim of this planner is to avoid relying on PostgreSQL's standard_planner() f ### Main C Functions Involved: -- `FastPathRouterPlan()`: The primary function for creating the fast-path query plan. +- `FastPathPlanner()`: The primary function for creating the fast-path query plan. - `FastPathRouterQuery()`: Validates if a query is eligible for fast-path routing by checking its structure and the WHERE clause. With set client_min_messages to debug4; you should see the following in the DEBUG messages: "DEBUG: Distributed planning for a fast-path router query" ```sql -- Fetches the count of users born in the same year, but only --- for a single country +-- for a single country, with a filter on the distribution column +-- Normally we have a single user with id = 15 because it's a PRIMARY KEY +-- this is just to demonstrate that fast-path can handle complex queries +-- with EXTRACT(), COUNT(), GROUP BY, HAVING, etc. SELECT EXTRACT(YEAR FROM date_of_birth) as birth_year, COUNT(*) FROM users_table WHERE country_code = 'USA' AND user_id = 15 @@ -382,11 +386,10 @@ FROM users_table u, orders_table o WHERE u.user_id = o.user_id AND u.user_id = 42; -- With Subqueries: - -- Fetch the username and their total order amount -- for a specific user SELECT u.username, - (SELECT MAX(o.product_id) FROM orders_table o + (SELECT COUNT(*) FROM orders_table o WHERE o.user_id = 42 AND o.user_id = u.user_id) FROM users_table u @@ -692,7 +695,7 @@ Assume that there are two subqueries; each subquery is individually joined on th -- The join condition between them is: sub1.user_id != sub2.user_id, which does not preserve distribution key equality. -- Citus qualifies sub1 as the anchor subquery and checks whether all other subqueries are joined on the distribution key. -- In this case, sub2 is not joined on the distribution key, so Citus decides to recursively plan the whole sub2. -SELECT a.user_id, b.user_id +SELECT sub1.user_id, sub2.user_id FROM ( SELECT u.user_id FROM users_table u @@ -884,7 +887,7 @@ Citus has a rules-based optimizer. The core function `MultiLogicalPlanCreate()` For instance, one simple optimization pushes the "filter" operation below the "MultiCollect." Such rules are defined in the function `Commutative()` in `multi_logical_optimizer.c`. -The most interesting part of the optimizer is usually in the final stage, when handling the more complex operators (GROUP BY, DISTINCT window functions, ORDER BY, aggregates). These operators are conjoined in a `MultiExtendedOpNode`. In many cases, they can only partially be pushed down into the worker nodes, which results in one `MultiExtendedOpNode` above the `MultiCollection` (which will run on the coordinator and aggregates across worker nodes), and another `MultiExtendedOpNode` below the `MultiCollect` (which will be pushed down to worker nodes). The bulk of the logic for generating the two nodes lives in `MasterExtendedOpNode()` and `WorkerExtendedOpNode()`, respectively. +The most interesting part of the optimizer is usually in the final stage, when handling the more complex operators (GROUP BY, DISTINCT window functions, ORDER BY, aggregates). These operators are conjoined in a `MultiExtendedOpNode`. In many cases, they can only partially be pushed down into the worker nodes, which results in one `MultiExtendedOpNode` above the `MultiCollect` (which will run on the coordinator and aggregates across worker nodes), and another `MultiExtendedOpNode` below the `MultiCollect` (which will be pushed down to worker nodes). The bulk of the logic for generating the two nodes lives in `MasterExtendedOpNode()` and `WorkerExtendedOpNode()`, respectively. ##### Aggregate functions @@ -1034,8 +1037,8 @@ SELECT * FROM cte_1; -- but as the same cte used twice -- Citus converts the CTE to intermediate result WITH cte_1 AS (SELECT DISTINCT user_id FROM orders_table) -SELECT * FROM cte_1 as c1 JOIN - cte_1 as c2 USING (user_id); +SELECT * FROM cte_1 as c1 + JOIN cte_1 as c2 USING (user_id); ``` - **Citus Specific Materialization**: @@ -1051,8 +1054,7 @@ As of writing this document, Citus does NOT support ```sql WITH users_that_have_orders AS (SELECT users_table.* FROM users_table JOIN orders_table USING (user_id)) -SELECT - max(date_of_birth) +SELECT max(date_of_birth) FROM users_that_have_orders GROUP BY GROUPING SETS (user_id, email); ... @@ -1099,7 +1101,7 @@ INSERT INTO orders_table (order_id, user_id) VALUES ``` **Debug Info**: - Debug information shows how the query is rebuilt for different user_ids. + Debug information shows how the query is rebuilt for different user_ids. Here, the shard_count is 4. ```sql -- for user_id: 1 DEBUG: query after rebuilding: INSERT INTO public.orders_table_102041 AS citus_table_alias (order_id, user_id) VALUES ('1'::bigint,'1'::bigint), ('3'::bigint,'1'::bigint) @@ -1133,7 +1135,7 @@ DEBUG: query after rebuilding: INSERT INTO public.orders_table_102064 AS citus **Examples**: The following section will delve into examples, starting with simple ones and moving to more complex scenarios. -### INSERT.. SELECT Advanced Scenarios +### INSERT.. SELECT Query Planning **Overview**: The `INSERT .. SELECT` pushdown logic builds upon the pushdown planning for `SELECT` commands. The key requirements include colocated tables and matching distribution columns. Relevant C functions are `CreateDistributedInsertSelectPlan`, `DistributedInsertSelectSupported()`, and `AllDistributionKeysInQueryAreEqual`. @@ -1267,7 +1269,7 @@ WHERE user_id IN (SELECT user_id FROM high_value_users); Used for more complex queries, like those with subqueries or joins that can't be pushed down. The queries are planned recursively. ```sql DELETE FROM users_table WHERE user_id -IN (SELECT user_id FROM orders_table WHERE total > 100 ORDER BY total DESC LIMIT 5); +IN (SELECT user_id FROM orders_table WHERE order_date < '2023-01-01' ORDER BY order_date LIMIT 5); ``` ### Correlated/Lateral Subqueries in Planning @@ -1279,8 +1281,7 @@ Correlated or LATERAL subqueries have special behavior in Citus. They can often **Key Code Details**: For more information on the code, check the following functions: `DeferErrorIfCannotPushdownSubquery()` -> - `ContainsReferencesToOuterQuery()` -> - `DeferErrorIfSubqueryRequiresMerge()`. + `ContainsReferencesToOuterQuery()`, `DeferErrorIfSubqueryRequiresMerge()`, `DeferredErrorIfUnsupportedLateralSubquery()`. LATERAL queries are different/unique: even if the subquery requires a merge step such as a `LIMIT`, if the correlation is on the distribution column, we can push it down. See [#4385](https://github.com/citusdata/citus/pull/4385). @@ -1409,7 +1410,7 @@ WITH recent_orders AS ( ) SELECT u.* FROM users_table u -JOIN recent_orders o ON u.user_id = o.product_id; +JOIN recent_orders o ON u.user_id = o.product_id JOIN orders_table o2 ON o2.product_id = o.product_id; ERROR: complex joins are only supported when all distributed tables are co-located and joined on their distribution columns ``` diff --git a/src/backend/distributed/commands/common.c b/src/backend/distributed/commands/common.c index 797981d47..9a87df9f1 100644 --- a/src/backend/distributed/commands/common.c +++ b/src/backend/distributed/commands/common.c @@ -28,6 +28,8 @@ #include "distributed/metadata/distobject.h" #include "distributed/multi_executor.h" #include "distributed/worker_transaction.h" +#include "catalog/pg_database.h" +#include "commands/dbcommands.h" /* diff --git a/src/backend/distributed/commands/database.c b/src/backend/distributed/commands/database.c index 7e90cbc9e..aab3b44f6 100644 --- a/src/backend/distributed/commands/database.c +++ b/src/backend/distributed/commands/database.c @@ -35,6 +35,15 @@ #include "distributed/deparse_shard_query.h" #include "distributed/listutils.h" #include "distributed/adaptive_executor.h" +#include "access/htup_details.h" +#include "catalog/pg_tablespace.h" +#include "access/heapam.h" +#include "utils/relcache.h" +#include "utils/rel.h" +#include "utils/lsyscache.h" +#include "catalog/pg_collation.h" +#include "utils/relcache.h" +#include "catalog/pg_database_d.h" static AlterOwnerStmt * RecreateAlterDatabaseOwnerStmt(Oid databaseOid); @@ -296,6 +305,7 @@ PreprocessAlterDatabaseSetStmt(Node *node, const char *queryString, return NIL; } + AlterDatabaseSetStmt *stmt = castNode(AlterDatabaseSetStmt, node); EnsureCoordinator(); @@ -310,6 +320,24 @@ PreprocessAlterDatabaseSetStmt(Node *node, const char *queryString, } +List * +PreprocessCreateDatabaseStmt(Node *node, const char *queryString, + ProcessUtilityContext processUtilityContext) +{ + if (!EnableCreateDatabasePropagation || !ShouldPropagate()) + { + return NIL; + } + + EnsureCoordinator(); + + /*Validate the statement */ + DeparseTreeNode(node); + + return NIL; +} + + /* * PostprocessCreatedbStmt is executed after the statement is applied to the local * postgres instance. In this stage we can prepare the commands that need to be run on @@ -328,16 +356,11 @@ PostprocessCreateDatabaseStmt(Node *node, const char *queryString) char *createDatabaseCommand = DeparseTreeNode(node); - StringInfo internalCreateCommand = makeStringInfo(); - appendStringInfo(internalCreateCommand, - "SELECT pg_catalog.citus_internal_database_command(%s)", - quote_literal_cstr(createDatabaseCommand)); - List *commands = list_make3(DISABLE_DDL_PROPAGATION, - (void *) internalCreateCommand->data, + (void *) createDatabaseCommand, ENABLE_DDL_PROPAGATION); - return NodeDDLTaskList(NON_COORDINATOR_NODES, commands); + return NontransactionalNodeDDLTask(NON_COORDINATOR_NODES, commands); } @@ -412,6 +435,7 @@ List * PreprocessDropDatabaseStmt(Node *node, const char *queryString, ProcessUtilityContext processUtilityContext) { + bool isPostProcess = false; if (!EnableCreateDatabasePropagation || !ShouldPropagate()) { return NIL; @@ -421,36 +445,48 @@ PreprocessDropDatabaseStmt(Node *node, const char *queryString, DropdbStmt *stmt = (DropdbStmt *) node; - Oid databaseOid = get_database_oid(stmt->dbname, stmt->missing_ok); + List *addresses = GetObjectAddressListFromParseTree(node, stmt->missing_ok, + isPostProcess); - if (databaseOid == InvalidOid) - { - /* let regular ProcessUtility deal with IF NOT EXISTS */ - return NIL; - } - - ObjectAddress dbAddress = { 0 }; - ObjectAddressSet(dbAddress, DatabaseRelationId, databaseOid); - if (!IsObjectDistributed(&dbAddress)) + if (list_length(addresses) == 0) { return NIL; } - UnmarkObjectDistributed(&dbAddress); + ObjectAddress *address = (ObjectAddress *) linitial(addresses); + if (address->objectId == InvalidOid || !IsObjectDistributed(address)) + { + return NIL; + } char *dropDatabaseCommand = DeparseTreeNode(node); - StringInfo internalDropCommand = makeStringInfo(); - appendStringInfo(internalDropCommand, - "SELECT pg_catalog.citus_internal_database_command(%s)", - quote_literal_cstr(dropDatabaseCommand)); - List *commands = list_make3(DISABLE_DDL_PROPAGATION, - (void *) internalDropCommand->data, + (void *) dropDatabaseCommand, ENABLE_DDL_PROPAGATION); - return NodeDDLTaskList(NON_COORDINATOR_NODES, commands); + return NontransactionalNodeDDLTask(NON_COORDINATOR_NODES, commands); +} + + +static ObjectAddress * +GetDatabaseAddressFromDatabaseName(char *databaseName, bool missingOk) +{ + Oid databaseOid = get_database_oid(databaseName, missingOk); + ObjectAddress *dbAddress = palloc0(sizeof(ObjectAddress)); + ObjectAddressSet(*dbAddress, DatabaseRelationId, databaseOid); + return dbAddress; +} + + +List * +DropDatabaseStmtObjectAddress(Node *node, bool missing_ok, bool isPostprocess) +{ + DropdbStmt *stmt = castNode(DropdbStmt, node); + ObjectAddress *dbAddress = GetDatabaseAddressFromDatabaseName(stmt->dbname, + missing_ok); + return list_make1(dbAddress); } @@ -458,9 +494,282 @@ List * CreateDatabaseStmtObjectAddress(Node *node, bool missing_ok, bool isPostprocess) { CreatedbStmt *stmt = castNode(CreatedbStmt, node); - Oid databaseOid = get_database_oid(stmt->dbname, missing_ok); - ObjectAddress *dbAddress = palloc0(sizeof(ObjectAddress)); - ObjectAddressSet(*dbAddress, DatabaseRelationId, databaseOid); - + ObjectAddress *dbAddress = GetDatabaseAddressFromDatabaseName(stmt->dbname, + missing_ok); return list_make1(dbAddress); } + + +static char * +GetTablespaceName(Oid tablespaceOid) +{ + HeapTuple tuple = SearchSysCache1(TABLESPACEOID, ObjectIdGetDatum(tablespaceOid)); + if (!HeapTupleIsValid(tuple)) + { + return NULL; + } + + Form_pg_tablespace tablespaceForm = (Form_pg_tablespace) GETSTRUCT(tuple); + char *tablespaceName = NameStr(tablespaceForm->spcname); + + ReleaseSysCache(tuple); + + return tablespaceName; +} + + +/* + * DatabaseCollationInfo is used to store collation related information of a database + */ +typedef struct DatabaseCollationInfo +{ + char *collation; + char *ctype; + #if PG_VERSION_NUM >= PG_VERSION_15 + char *icu_locale; + char *collversion; + #endif +} DatabaseCollationInfo; + +/* + * GetDatabaseCollation gets oid of a database and returns all the collation related information + * We need this method since collation related info in Form_pg_database is not accessible + */ +static DatabaseCollationInfo +GetDatabaseCollation(Oid db_oid) +{ + DatabaseCollationInfo info; + bool isNull; + + Snapshot snapshot = RegisterSnapshot(GetLatestSnapshot()); + Relation rel = table_open(DatabaseRelationId, AccessShareLock); + HeapTuple tup = get_catalog_object_by_oid(rel, Anum_pg_database_oid, db_oid); + if (!HeapTupleIsValid(tup)) + { + elog(ERROR, "cache lookup failed for database %u", db_oid); + } + + TupleDesc tupdesc = RelationGetDescr(rel); + Datum collationDatum = heap_getattr(tup, Anum_pg_database_datcollate, tupdesc, + &isNull); + if (isNull) + { + info.collation = NULL; + } + else + { + info.collation = TextDatumGetCString(collationDatum); + } + + Datum ctypeDatum = heap_getattr(tup, Anum_pg_database_datctype, tupdesc, &isNull); + if (isNull) + { + info.ctype = NULL; + } + else + { + info.ctype = TextDatumGetCString(ctypeDatum); + } + + #if PG_VERSION_NUM >= PG_VERSION_15 + + Datum icuLocaleDatum = heap_getattr(tup, Anum_pg_database_daticulocale, tupdesc, + &isNull); + if (isNull) + { + info.icu_locale = NULL; + } + else + { + info.icu_locale = TextDatumGetCString(icuLocaleDatum); + } + + Datum collverDatum = heap_getattr(tup, Anum_pg_database_datcollversion, tupdesc, + &isNull); + if (isNull) + { + info.collversion = NULL; + } + else + { + info.collversion = TextDatumGetCString(collverDatum); + } + #endif + + table_close(rel, AccessShareLock); + UnregisterSnapshot(snapshot); + heap_freetuple(tup); + + return info; +} + + +static void +FreeDatabaseCollationInfo(DatabaseCollationInfo collInfo) +{ + if (collInfo.collation != NULL) + { + pfree(collInfo.collation); + } + if (collInfo.ctype != NULL) + { + pfree(collInfo.ctype); + } + #if PG_VERSION_NUM >= PG_VERSION_15 + if (collInfo.icu_locale != NULL) + { + pfree(collInfo.icu_locale); + } + #endif +} + + +#if PG_VERSION_NUM >= PG_VERSION_15 +static char * +get_locale_provider_string(char datlocprovider) +{ + switch (datlocprovider) + { + case 'c': + { + return "libc"; + } + + case 'i': + { + return "icu"; + } + + case 'l': + { + return "locale"; + } + + default: + return ""; + } +} + + +#endif + + +/* + * GenerateCreateDatabaseStatementFromPgDatabase is gets the pg_database tuple and returns the CREATE DATABASE statement + */ +static char * +GenerateCreateDatabaseStatementFromPgDatabase(Form_pg_database databaseForm) +{ + DatabaseCollationInfo collInfo = GetDatabaseCollation(databaseForm->oid); + + StringInfoData str; + initStringInfo(&str); + + appendStringInfo(&str, "CREATE DATABASE %s", quote_identifier(NameStr( + databaseForm-> + datname))); + + if (databaseForm->datdba != InvalidOid) + { + appendStringInfo(&str, " OWNER = %s", GetUserNameFromId(databaseForm->datdba, + false)); + } + + if (databaseForm->encoding != -1) + { + appendStringInfo(&str, " ENCODING = '%s'", pg_encoding_to_char( + databaseForm->encoding)); + } + + if (collInfo.collation != NULL) + { + appendStringInfo(&str, " LC_COLLATE = '%s'", collInfo.collation); + } + if (collInfo.ctype != NULL) + { + appendStringInfo(&str, " LC_CTYPE = '%s'", collInfo.ctype); + } + + #if PG_VERSION_NUM >= PG_VERSION_15 + if (collInfo.icu_locale != NULL) + { + appendStringInfo(&str, " ICU_LOCALE = '%s'", collInfo.icu_locale); + } + + if (databaseForm->datlocprovider != 0) + { + appendStringInfo(&str, " LOCALE_PROVIDER = '%s'", get_locale_provider_string( + databaseForm->datlocprovider)); + } + + if (collInfo.collversion != NULL) + { + appendStringInfo(&str, " COLLATION_VERSION = '%s'", collInfo.collversion); + } + #endif + + if (databaseForm->dattablespace != InvalidOid) + { + appendStringInfo(&str, " TABLESPACE = %s", quote_identifier(GetTablespaceName( + databaseForm-> + dattablespace))); + } + + appendStringInfo(&str, " ALLOW_CONNECTIONS = '%s'", databaseForm->datallowconn ? + "true" : "false"); + + if (databaseForm->datconnlimit >= 0) + { + appendStringInfo(&str, " CONNECTION LIMIT %d", databaseForm->datconnlimit); + } + + appendStringInfo(&str, " IS_TEMPLATE = '%s'", databaseForm->datistemplate ? "true" : + "false"); + + FreeDatabaseCollationInfo(collInfo); + + + return str.data; +} + + +/* + * GenerateCreateDatabaseCommandList is gets the pg_database tuples and returns the CREATE DATABASE statement list + * for all the databases in the cluster.citus_internal_database_command UDF is used to send the CREATE DATABASE + * statement to the workers since the CREATE DATABASE statement gives error in transaction context. + */ +List * +GenerateCreateDatabaseCommandList(void) +{ + List *commands = NIL; + HeapTuple tuple; + + Relation pgDatabaseRel = table_open(DatabaseRelationId, AccessShareLock); + TableScanDesc scan = table_beginscan_catalog(pgDatabaseRel, 0, NULL); + + while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL) + { + Form_pg_database databaseForm = (Form_pg_database) GETSTRUCT(tuple); + + char *createStmt = GenerateCreateDatabaseStatementFromPgDatabase(databaseForm); + + + StringInfo outerDbStmt = makeStringInfo(); + + /* Generate the CREATE DATABASE statement */ + appendStringInfo(outerDbStmt, + "select pg_catalog.citus_internal_database_command( %s)", + quote_literal_cstr( + createStmt)); + + elog(LOG, "outerDbStmt: %s", outerDbStmt->data); + + /* Add the statement to the list of commands */ + commands = lappend(commands, outerDbStmt->data); + } + + heap_endscan(scan); + table_close(pgDatabaseRel, AccessShareLock); + + return commands; +} diff --git a/src/backend/distributed/commands/distribute_object_ops.c b/src/backend/distributed/commands/distribute_object_ops.c index e09169702..3063a5705 100644 --- a/src/backend/distributed/commands/distribute_object_ops.c +++ b/src/backend/distributed/commands/distribute_object_ops.c @@ -469,7 +469,7 @@ static DistributeObjectOps Database_Alter = { static DistributeObjectOps Database_Create = { .deparse = DeparseCreateDatabaseStmt, .qualify = NULL, - .preprocess = NULL, + .preprocess = PreprocessCreateDatabaseStmt, .postprocess = PostprocessCreateDatabaseStmt, .objectType = OBJECT_DATABASE, .operationType = DIST_OPS_CREATE, @@ -484,7 +484,7 @@ static DistributeObjectOps Database_Drop = { .postprocess = NULL, .objectType = OBJECT_DATABASE, .operationType = DIST_OPS_DROP, - .address = NULL, + .address = DropDatabaseStmtObjectAddress, .markDistributed = false, }; diff --git a/src/backend/distributed/commands/index.c b/src/backend/distributed/commands/index.c index 8271cc4f4..275f253b3 100644 --- a/src/backend/distributed/commands/index.c +++ b/src/backend/distributed/commands/index.c @@ -938,7 +938,7 @@ CreateIndexTaskList(IndexStmt *indexStmt) task->dependentTaskList = NULL; task->anchorShardId = shardId; task->taskPlacementList = ActiveShardPlacementList(shardId); - task->cannotBeExecutedInTransction = indexStmt->concurrent; + task->cannotBeExecutedInTransaction = indexStmt->concurrent; taskList = lappend(taskList, task); @@ -983,7 +983,7 @@ CreateReindexTaskList(Oid relationId, ReindexStmt *reindexStmt) task->dependentTaskList = NULL; task->anchorShardId = shardId; task->taskPlacementList = ActiveShardPlacementList(shardId); - task->cannotBeExecutedInTransction = + task->cannotBeExecutedInTransaction = IsReindexWithParam_compat(reindexStmt, "concurrently"); taskList = lappend(taskList, task); @@ -1309,7 +1309,7 @@ DropIndexTaskList(Oid relationId, Oid indexId, DropStmt *dropStmt) task->dependentTaskList = NULL; task->anchorShardId = shardId; task->taskPlacementList = ActiveShardPlacementList(shardId); - task->cannotBeExecutedInTransction = dropStmt->concurrent; + task->cannotBeExecutedInTransaction = dropStmt->concurrent; taskList = lappend(taskList, task); diff --git a/src/backend/distributed/commands/role.c b/src/backend/distributed/commands/role.c index 754be1a2b..792efd934 100644 --- a/src/backend/distributed/commands/role.c +++ b/src/backend/distributed/commands/role.c @@ -65,7 +65,6 @@ static DefElem * makeDefElemBool(char *name, bool value); static List * GenerateRoleOptionsList(HeapTuple tuple); static List * GenerateGrantRoleStmtsFromOptions(RoleSpec *roleSpec, List *options); static List * GenerateGrantRoleStmtsOfRole(Oid roleid); -static void EnsureSequentialModeForRoleDDL(void); static char * GetRoleNameFromDbRoleSetting(HeapTuple tuple, TupleDesc DbRoleSettingDescription); @@ -1278,7 +1277,7 @@ CreateRoleStmtObjectAddress(Node *node, bool missing_ok, bool isPostprocess) * with the role the role needs to be visible on all connections used by the transaction, * meaning we can only use 1 connection per node. */ -static void +void EnsureSequentialModeForRoleDDL(void) { if (!IsTransactionBlock()) diff --git a/src/backend/distributed/commands/utility_hook.c b/src/backend/distributed/commands/utility_hook.c index dd729cad0..0d400d139 100644 --- a/src/backend/distributed/commands/utility_hook.c +++ b/src/backend/distributed/commands/utility_hook.c @@ -80,6 +80,7 @@ #include "utils/inval.h" #include "utils/lsyscache.h" #include "utils/syscache.h" +#include "catalog/pg_database.h" bool EnableDDLPropagation = true; /* ddl propagation is enabled */ @@ -95,13 +96,13 @@ int UtilityHookLevel = 0; /* Local functions forward declarations for helper functions */ -static void ProcessUtilityInternal(PlannedStmt *pstmt, - const char *queryString, - ProcessUtilityContext context, - ParamListInfo params, - struct QueryEnvironment *queryEnv, - DestReceiver *dest, - QueryCompletion *completionTag); +static void citus_ProcessUtilityInternal(PlannedStmt *pstmt, + const char *queryString, + ProcessUtilityContext context, + ParamListInfo params, + struct QueryEnvironment *queryEnv, + DestReceiver *dest, + QueryCompletion *completionTag); static void set_indexsafe_procflags(void); static char * CurrentSearchPath(void); static void IncrementUtilityHookCountersIfNecessary(Node *parsetree); @@ -130,7 +131,7 @@ ProcessUtilityParseTree(Node *node, const char *queryString, ProcessUtilityConte /* - * multi_ProcessUtility is the main entry hook for implementing Citus-specific + * citus_ProcessUtility is the main entry hook for implementing Citus-specific * utility behavior. Its primary responsibilities are intercepting COPY and DDL * commands and augmenting the coordinator's command with corresponding tasks * to be run on worker nodes, after suitably ensuring said commands' options @@ -139,7 +140,7 @@ ProcessUtilityParseTree(Node *node, const char *queryString, ProcessUtilityConte * TRUNCATE and VACUUM are also supported. */ void -multi_ProcessUtility(PlannedStmt *pstmt, +citus_ProcessUtility(PlannedStmt *pstmt, const char *queryString, bool readOnlyTree, ProcessUtilityContext context, @@ -329,8 +330,8 @@ multi_ProcessUtility(PlannedStmt *pstmt, PG_TRY(); { - ProcessUtilityInternal(pstmt, queryString, context, params, queryEnv, dest, - completionTag); + citus_ProcessUtilityInternal(pstmt, queryString, context, params, queryEnv, dest, + completionTag); if (UtilityHookLevel == 1) { @@ -404,7 +405,7 @@ multi_ProcessUtility(PlannedStmt *pstmt, /* - * ProcessUtilityInternal is a helper function for multi_ProcessUtility where majority + * citus_ProcessUtilityInternal is a helper function for citus_ProcessUtility where majority * of the Citus specific utility statements are handled here. The distinction between * both functions is that Citus_ProcessUtility does not handle CALL and DO statements. * The reason for the distinction is implemented to be able to find the "top-level" DDL @@ -412,13 +413,13 @@ multi_ProcessUtility(PlannedStmt *pstmt, * this goal. */ static void -ProcessUtilityInternal(PlannedStmt *pstmt, - const char *queryString, - ProcessUtilityContext context, - ParamListInfo params, - struct QueryEnvironment *queryEnv, - DestReceiver *dest, - QueryCompletion *completionTag) +citus_ProcessUtilityInternal(PlannedStmt *pstmt, + const char *queryString, + ProcessUtilityContext context, + ParamListInfo params, + struct QueryEnvironment *queryEnv, + DestReceiver *dest, + QueryCompletion *completionTag) { Node *parsetree = pstmt->utilityStmt; List *ddlJobs = NIL; @@ -578,6 +579,7 @@ ProcessUtilityInternal(PlannedStmt *pstmt, PreprocessLockStatement((LockStmt *) parsetree, context); } + /* * We only process ALTER TABLE ... ATTACH PARTITION commands in the function below * and distribute the partition if necessary. @@ -724,22 +726,12 @@ ProcessUtilityInternal(PlannedStmt *pstmt, } /* - * Make sure that dropping the role deletes the pg_dist_object entries. There is a - * separate logic for roles, since roles are not included as dropped objects in the + * Make sure that dropping the role and database deletes the pg_dist_object entries. There is a + * separate logic for roles and database, since roles and database are not included as dropped objects in the * drop event trigger. To handle it both on worker and coordinator nodes, it is not * implemented as a part of process functions but here. */ - if (IsA(parsetree, DropRoleStmt)) - { - DropRoleStmt *stmt = castNode(DropRoleStmt, parsetree); - List *allDropRoles = stmt->roles; - - List *distributedDropRoles = FilterDistributedRoles(allDropRoles); - if (list_length(distributedDropRoles) > 0) - { - UnmarkRolesDistributed(distributedDropRoles); - } - } + UnmarkRolesAndDatabaseDistributed(parsetree); pstmt->utilityStmt = parsetree; @@ -1273,9 +1265,10 @@ ExecuteDistributedDDLJob(DDLJob *ddlJob) { ereport(WARNING, (errmsg( - "CONCURRENTLY-enabled index commands can fail partially, " - "leaving behind an INVALID index.\n Use DROP INDEX " - "CONCURRENTLY IF EXISTS to remove the invalid index."))); + "Commands that are not transaction-safe may result in partial failure" + ", potentially leading to an inconsistent state.\nIf the problematic command" + " is a CREATE operation, consider using the 'IF EXISTS' syntax to drop the " + "object,\nif applicable, and then reattempt the original command."))); PG_RE_THROW(); } } @@ -1386,7 +1379,7 @@ PostStandardProcessUtility(Node *parsetree) * on the local table first. However, in order to decide whether the * command leads to an invalidation, we need to check before the command * is being executed since we read pg_constraint table. Thus, we maintain a - * local flag and do the invalidation after multi_ProcessUtility, + * local flag and do the invalidation after citus_ProcessUtility, * before ExecuteDistributedDDLJob(). */ InvalidateForeignKeyGraphForDDL(); @@ -1489,6 +1482,28 @@ DDLTaskList(Oid relationId, const char *commandString) } +/* + * NontransactionalNodeDDLTask builds a list of tasks to execute a DDL command on a + * given target set of nodes with cannotBeExecutedInTransaction is set to make sure + * that list is being executed without a transaction. + */ +List * +NontransactionalNodeDDLTask(TargetWorkerSet targets, List *commands) +{ + List *ddlJobs = NodeDDLTaskList(targets, commands); + DDLJob *ddlJob = NULL; + foreach_ptr(ddlJob, ddlJobs) + { + Task *task = NULL; + foreach_ptr(task, ddlJob->taskList) + { + task->cannotBeExecutedInTransaction = true; + } + } + return ddlJobs; +} + + /* * NodeDDLTaskList builds a list of tasks to execute a DDL command on a * given target set of nodes. diff --git a/src/backend/distributed/commands/vacuum.c b/src/backend/distributed/commands/vacuum.c index ee03aeae1..21638ba7f 100644 --- a/src/backend/distributed/commands/vacuum.c +++ b/src/backend/distributed/commands/vacuum.c @@ -279,7 +279,7 @@ VacuumTaskList(Oid relationId, CitusVacuumParams vacuumParams, List *vacuumColum task->replicationModel = REPLICATION_MODEL_INVALID; task->anchorShardId = shardId; task->taskPlacementList = ActiveShardPlacementList(shardId); - task->cannotBeExecutedInTransction = ((vacuumParams.options) & VACOPT_VACUUM); + task->cannotBeExecutedInTransaction = ((vacuumParams.options) & VACOPT_VACUUM); taskList = lappend(taskList, task); } @@ -719,7 +719,7 @@ ExecuteUnqualifiedVacuumTasks(VacuumStmt *vacuumStmt, CitusVacuumParams vacuumPa SetTaskQueryStringList(task, unqualifiedVacuumCommands); task->dependentTaskList = NULL; task->replicationModel = REPLICATION_MODEL_INVALID; - task->cannotBeExecutedInTransction = ((vacuumParams.options) & VACOPT_VACUUM); + task->cannotBeExecutedInTransaction = ((vacuumParams.options) & VACOPT_VACUUM); bool hasPeerWorker = false; diff --git a/src/backend/distributed/deparser/citus_deparseutils.c b/src/backend/distributed/deparser/citus_deparseutils.c index 52d96930e..6492c14f2 100644 --- a/src/backend/distributed/deparser/citus_deparseutils.c +++ b/src/backend/distributed/deparser/citus_deparseutils.c @@ -31,38 +31,53 @@ optionToStatement(StringInfo buf, DefElem *option, const struct { if (strcmp(name, opt_formats[i].name) == 0) { - if (opt_formats[i].type == OPTION_FORMAT_STRING) + switch (opt_formats[i].type) { - char *value = defGetString(option); - appendStringInfo(buf, opt_formats[i].format, quote_identifier(value)); + case OPTION_FORMAT_STRING: + { + char *value = defGetString(option); + appendStringInfo(buf, opt_formats[i].format, quote_identifier(value)); + break; + } + + case OPTION_FORMAT_INTEGER: + { + int32 value = defGetInt32(option); + appendStringInfo(buf, opt_formats[i].format, value); + break; + } + + case OPTION_FORMAT_BOOLEAN: + { + bool value = defGetBoolean(option); + appendStringInfo(buf, opt_formats[i].format, value ? "true" : + "false"); + break; + } + + #if PG_VERSION_NUM >= PG_VERSION_15 + case OPTION_FORMAT_OBJECT_ID: + { + Oid value = defGetObjectId(option); + appendStringInfo(buf, opt_formats[i].format, value); + break; + } + + #endif + case OPTION_FORMAT_LITERAL_CSTR: + { + char *value = defGetString(option); + appendStringInfo(buf, opt_formats[i].format, quote_literal_cstr( + value)); + break; + } + + default: + { + elog(ERROR, "unrecognized option type: %d", opt_formats[i].type); + break; + } } - else if (opt_formats[i].type == OPTION_FORMAT_INTEGER) - { - int32 value = defGetInt32(option); - appendStringInfo(buf, opt_formats[i].format, value); - } - else if (opt_formats[i].type == OPTION_FORMAT_BOOLEAN) - { - bool value = defGetBoolean(option); - appendStringInfo(buf, opt_formats[i].format, value ? "true" : "false"); - } -#if PG_VERSION_NUM >= PG_VERSION_15 - else if (opt_formats[i].type == OPTION_FORMAT_OBJECT_ID) - { - Oid value = defGetObjectId(option); - appendStringInfo(buf, opt_formats[i].format, value); - } -#endif - else if (opt_formats[i].type == OPTION_FORMAT_LITERAL_CSTR) - { - char *value = defGetString(option); - appendStringInfo(buf, opt_formats[i].format, quote_literal_cstr(value)); - } - else - { - elog(ERROR, "unrecognized option type: %d", opt_formats[i].type); - } - break; } } } diff --git a/src/backend/distributed/deparser/deparse_database_stmts.c b/src/backend/distributed/deparser/deparse_database_stmts.c index 793e01d08..b3f12b03a 100644 --- a/src/backend/distributed/deparser/deparse_database_stmts.c +++ b/src/backend/distributed/deparser/deparse_database_stmts.c @@ -267,6 +267,21 @@ AppendCreateDatabaseStmt(StringInfo buf, CreatedbStmt *stmt) foreach_ptr(option, stmt->options) { + /*If option is template, lc_type, locale or lc_collate, propagation will not be supportted */ + /* since template database is not stored in the catalog */ + if (strcmp(option->defname, "template") == 0 || + strcmp(option->defname, "strategy") == 0 || + strcmp(option->defname, "lc_ctype") == 0 || + strcmp(option->defname, "locale") == 0 || + strcmp(option->defname, "lc_collate") == 0 || + strcmp(option->defname, "icu_locale") == 0 || + strcmp(option->defname, "locale_provider") == 0) + { + ereport(ERROR, + errmsg("CREATE DATABASE option \"%s\" is not supported", + option->defname)); + } + optionToStatement(buf, option, create_database_option_formats, lengthof( create_database_option_formats)); } diff --git a/src/backend/distributed/executor/executor_util_tasks.c b/src/backend/distributed/executor/executor_util_tasks.c index abf721196..483fd55a7 100644 --- a/src/backend/distributed/executor/executor_util_tasks.c +++ b/src/backend/distributed/executor/executor_util_tasks.c @@ -61,7 +61,7 @@ TaskListRequiresRollback(List *taskList) } Task *task = (Task *) linitial(taskList); - if (task->cannotBeExecutedInTransction) + if (task->cannotBeExecutedInTransaction) { /* vacuum, create index concurrently etc. */ return false; @@ -164,7 +164,7 @@ TaskListCannotBeExecutedInTransaction(List *taskList) Task *task = NULL; foreach_ptr(task, taskList) { - if (task->cannotBeExecutedInTransction) + if (task->cannotBeExecutedInTransaction) { return true; } diff --git a/src/backend/distributed/metadata/distobject.c b/src/backend/distributed/metadata/distobject.c index c420e6ec3..af8354ee3 100644 --- a/src/backend/distributed/metadata/distobject.c +++ b/src/backend/distributed/metadata/distobject.c @@ -48,6 +48,8 @@ #include "utils/lsyscache.h" #include "utils/regproc.h" #include "utils/rel.h" +#include "catalog/pg_database.h" +#include "commands/dbcommands.h" static char * CreatePgDistObjectEntryCommand(const ObjectAddress *objectAddress); @@ -356,6 +358,33 @@ ExecuteCommandAsSuperuser(char *query, int paramCount, Oid *paramTypes, } +void +UnmarkRolesAndDatabaseDistributed(Node *node) +{ + if (IsA(node, DropRoleStmt)) + { + DropRoleStmt *stmt = castNode(DropRoleStmt, node); + List *allDropRoles = stmt->roles; + + List *distributedDropRoles = FilterDistributedRoles(allDropRoles); + if (list_length(distributedDropRoles) > 0) + { + UnmarkRolesDistributed(distributedDropRoles); + } + } + else if (IsA(node, DropdbStmt)) + { + DropdbStmt *stmt = castNode(DropdbStmt, node); + char *dbName = stmt->dbname; + + Oid dbOid = get_database_oid(dbName, stmt->missing_ok); + ObjectAddress *dbAddress = palloc0(sizeof(ObjectAddress)); + ObjectAddressSet(*dbAddress, DatabaseRelationId, dbOid); + UnmarkObjectDistributed(dbAddress); + } +} + + /* * UnmarkObjectDistributed removes the entry from pg_dist_object that marks this object as * distributed. This will prevent updates to that object to be propagated to the worker. diff --git a/src/backend/distributed/metadata/metadata_cache.c b/src/backend/distributed/metadata/metadata_cache.c index 55d0f11c5..85a945308 100644 --- a/src/backend/distributed/metadata/metadata_cache.c +++ b/src/backend/distributed/metadata/metadata_cache.c @@ -521,8 +521,7 @@ IsCitusTableTypeCacheEntry(CitusTableCacheEntry *tableEntry, CitusTableType tabl /* - * HasDistributionKey returs true if given Citus table doesn't have a - * distribution key. + * HasDistributionKey returns true if given Citus table has a distribution key. */ bool HasDistributionKey(Oid relationId) @@ -538,8 +537,8 @@ HasDistributionKey(Oid relationId) /* - * HasDistributionKey returs true if given cache entry identifies a Citus - * table that doesn't have a distribution key. + * HasDistributionKeyCacheEntry returns true if given cache entry identifies a + * Citus table that has a distribution key. */ bool HasDistributionKeyCacheEntry(CitusTableCacheEntry *tableEntry) diff --git a/src/backend/distributed/metadata/metadata_sync.c b/src/backend/distributed/metadata/metadata_sync.c index 40bdae0ea..54fa801ae 100644 --- a/src/backend/distributed/metadata/metadata_sync.c +++ b/src/backend/distributed/metadata/metadata_sync.c @@ -4501,6 +4501,13 @@ PropagateNodeWideObjectsCommandList(void) /* collect all commands */ List *ddlCommands = NIL; + if (EnableCreateDatabasePropagation) + { + /* Get commands for database creation */ + List *createDatabaseCommands = GenerateCreateDatabaseCommandList(); + ddlCommands = list_concat(ddlCommands, createDatabaseCommands); + } + if (EnableAlterRoleSetPropagation) { /* diff --git a/src/backend/distributed/planner/fast_path_router_planner.c b/src/backend/distributed/planner/fast_path_router_planner.c index 933ee7425..ed256296c 100644 --- a/src/backend/distributed/planner/fast_path_router_planner.c +++ b/src/backend/distributed/planner/fast_path_router_planner.c @@ -154,7 +154,7 @@ GeneratePlaceHolderPlannedStmt(Query *parse) * being a fast path router query. * The requirements for the fast path query can be listed below: * - * - SELECT query without CTES, sublinks-subqueries, set operations + * - SELECT/UPDATE/DELETE query without CTES, sublinks-subqueries, set operations * - The query should touch only a single hash distributed or reference table * - The distribution with equality operator should be in the WHERE clause * and it should be ANDed with any other filters. Also, the distribution diff --git a/src/backend/distributed/planner/multi_router_planner.c b/src/backend/distributed/planner/multi_router_planner.c index 0d7a0de78..e70de5bbd 100644 --- a/src/backend/distributed/planner/multi_router_planner.c +++ b/src/backend/distributed/planner/multi_router_planner.c @@ -2324,27 +2324,11 @@ PlanRouterQuery(Query *originalQuery, TargetShardIntervalForFastPathQuery(originalQuery, &isMultiShardQuery, distributionKeyValue, partitionValueConst); - - /* - * This could only happen when there is a parameter on the distribution key. - * We defer error here, later the planner is forced to use a generic plan - * by assigning arbitrarily high cost to the plan. - */ - if (UpdateOrDeleteOrMergeQuery(originalQuery) && isMultiShardQuery) - { - planningError = DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, - "Router planner cannot handle multi-shard " - "modify queries", NULL, NULL); - return planningError; - } + Assert(!isMultiShardQuery); *prunedShardIntervalListList = shardIntervalList; - - if (!isMultiShardQuery) - { - ereport(DEBUG2, (errmsg("Distributed planning for a fast-path router " - "query"))); - } + ereport(DEBUG2, (errmsg("Distributed planning for a fast-path router " + "query"))); } else { diff --git a/src/backend/distributed/shared_library_init.c b/src/backend/distributed/shared_library_init.c index 00cdb0027..32ad4c427 100644 --- a/src/backend/distributed/shared_library_init.c +++ b/src/backend/distributed/shared_library_init.c @@ -481,6 +481,7 @@ _PG_init(void) #endif InitializeMaintenanceDaemon(); + InitializeMaintenanceDaemonForMainDb(); /* initialize coordinated transaction management */ InitializeTransactionManagement(); @@ -543,7 +544,7 @@ _PG_init(void) */ PrevProcessUtility = (ProcessUtility_hook != NULL) ? ProcessUtility_hook : standard_ProcessUtility; - ProcessUtility_hook = multi_ProcessUtility; + ProcessUtility_hook = citus_ProcessUtility; /* * Acquire symbols for columnar functions that citus calls. @@ -1831,6 +1832,16 @@ RegisterCitusConfigVariables(void) GUC_NO_SHOW_ALL | GUC_NOT_IN_SAMPLE | GUC_UNIT_MS, NULL, NULL, NULL); + DefineCustomStringVariable( + "citus.main_db", + gettext_noop("Which database is designated as the main_db"), + NULL, + &MainDb, + "", + PGC_POSTMASTER, + GUC_STANDARD, + NULL, NULL, NULL); + DefineCustomIntVariable( "citus.max_adaptive_executor_pool_size", gettext_noop("Sets the maximum number of connections per worker node used by " diff --git a/src/backend/distributed/sql/citus--12.1-1--12.2-1.sql b/src/backend/distributed/sql/citus--12.1-1--12.2-1.sql index 5d869f40e..578a182ef 100644 --- a/src/backend/distributed/sql/citus--12.1-1--12.2-1.sql +++ b/src/backend/distributed/sql/citus--12.1-1--12.2-1.sql @@ -1,16 +1,4 @@ -- citus--12.1-1--12.2-1 - --- --- citus_internal_database_command creates a database according to the given command. - -CREATE OR REPLACE FUNCTION pg_catalog.citus_internal_database_command(command text) - RETURNS void - LANGUAGE C - STRICT -AS 'MODULE_PATHNAME', $$citus_internal_database_command$$; -COMMENT ON FUNCTION pg_catalog.citus_internal_database_command(text) IS - 'run a database command without transaction block restrictions'; - -- bump version to 12.2-1 - +#include "udfs/citus_internal_database_command/12.2-1.sql" #include "udfs/citus_add_rebalance_strategy/12.2-1.sql" diff --git a/src/backend/distributed/sql/udfs/citus_internal_database_command/12.2-1.sql b/src/backend/distributed/sql/udfs/citus_internal_database_command/12.2-1.sql new file mode 100644 index 000000000..232e3ad14 --- /dev/null +++ b/src/backend/distributed/sql/udfs/citus_internal_database_command/12.2-1.sql @@ -0,0 +1,10 @@ +-- +-- citus_internal_database_command creates a database according to the given command. + +CREATE OR REPLACE FUNCTION pg_catalog.citus_internal_database_command(command text) + RETURNS void + LANGUAGE C + STRICT +AS 'MODULE_PATHNAME', $$citus_internal_database_command$$; +COMMENT ON FUNCTION pg_catalog.citus_internal_database_command(text) IS + 'run a database command without transaction block restrictions'; diff --git a/src/backend/distributed/sql/udfs/citus_internal_database_command/latest.sql b/src/backend/distributed/sql/udfs/citus_internal_database_command/latest.sql new file mode 100644 index 000000000..232e3ad14 --- /dev/null +++ b/src/backend/distributed/sql/udfs/citus_internal_database_command/latest.sql @@ -0,0 +1,10 @@ +-- +-- citus_internal_database_command creates a database according to the given command. + +CREATE OR REPLACE FUNCTION pg_catalog.citus_internal_database_command(command text) + RETURNS void + LANGUAGE C + STRICT +AS 'MODULE_PATHNAME', $$citus_internal_database_command$$; +COMMENT ON FUNCTION pg_catalog.citus_internal_database_command(text) IS + 'run a database command without transaction block restrictions'; diff --git a/src/backend/distributed/utils/citus_copyfuncs.c b/src/backend/distributed/utils/citus_copyfuncs.c index 7e1379ef3..fe4429f04 100644 --- a/src/backend/distributed/utils/citus_copyfuncs.c +++ b/src/backend/distributed/utils/citus_copyfuncs.c @@ -326,7 +326,7 @@ CopyNodeTask(COPYFUNC_ARGS) COPY_STRING_FIELD(fetchedExplainAnalyzePlan); COPY_SCALAR_FIELD(fetchedExplainAnalyzeExecutionDuration); COPY_SCALAR_FIELD(isLocalTableModification); - COPY_SCALAR_FIELD(cannotBeExecutedInTransction); + COPY_SCALAR_FIELD(cannotBeExecutedInTransaction); } diff --git a/src/backend/distributed/utils/citus_outfuncs.c b/src/backend/distributed/utils/citus_outfuncs.c index b4062751a..9b4ac809c 100644 --- a/src/backend/distributed/utils/citus_outfuncs.c +++ b/src/backend/distributed/utils/citus_outfuncs.c @@ -535,7 +535,7 @@ OutTask(OUTFUNC_ARGS) WRITE_STRING_FIELD(fetchedExplainAnalyzePlan); WRITE_FLOAT_FIELD(fetchedExplainAnalyzeExecutionDuration, "%.2f"); WRITE_BOOL_FIELD(isLocalTableModification); - WRITE_BOOL_FIELD(cannotBeExecutedInTransction); + WRITE_BOOL_FIELD(cannotBeExecutedInTransaction); } diff --git a/src/backend/distributed/utils/maintenanced.c b/src/backend/distributed/utils/maintenanced.c index 5f49de20a..22a0843bd 100644 --- a/src/backend/distributed/utils/maintenanced.c +++ b/src/backend/distributed/utils/maintenanced.c @@ -99,6 +99,7 @@ int Recover2PCInterval = 60000; int DeferShardDeleteInterval = 15000; int BackgroundTaskQueueCheckInterval = 5000; int MaxBackgroundTaskExecutors = 4; +char *MainDb = ""; /* config variables for metadata sync timeout */ int MetadataSyncInterval = 60000; @@ -112,7 +113,7 @@ static MaintenanceDaemonControlData *MaintenanceDaemonControl = NULL; * activated. */ static HTAB *MaintenanceDaemonDBHash; - +static ErrorContextCallback errorCallback = { 0 }; static volatile sig_atomic_t got_SIGHUP = false; static volatile sig_atomic_t got_SIGTERM = false; @@ -125,6 +126,8 @@ static void MaintenanceDaemonShmemExit(int code, Datum arg); static void MaintenanceDaemonErrorContext(void *arg); static bool MetadataSyncTriggeredCheckAndReset(MaintenanceDaemonDBData *dbData); static void WarnMaintenanceDaemonNotStarted(void); +static MaintenanceDaemonDBData * GetMaintenanceDaemonDBHashEntry(Oid databaseId, + bool *found); /* * InitializeMaintenanceDaemon, called at server start, is responsible for @@ -139,6 +142,82 @@ InitializeMaintenanceDaemon(void) } +/* + * GetMaintenanceDaemonDBHashEntry searches the MaintenanceDaemonDBHash for the + * databaseId. It returns the entry if found or creates a new entry and initializes + * the value with zeroes. + */ +MaintenanceDaemonDBData * +GetMaintenanceDaemonDBHashEntry(Oid databaseId, bool *found) +{ + MaintenanceDaemonDBData *dbData = (MaintenanceDaemonDBData *) hash_search( + MaintenanceDaemonDBHash, + &MyDatabaseId, + HASH_ENTER_NULL, + found); + + if (!dbData) + { + elog(LOG, + "cannot create or find the maintenance deamon hash entry for database %u", + databaseId); + return NULL; + } + + if (!*found) + { + /* ensure the values in MaintenanceDaemonDBData are zero */ + memset(((char *) dbData) + sizeof(Oid), 0, + sizeof(MaintenanceDaemonDBData) - sizeof(Oid)); + } + + return dbData; +} + + +/* + * InitializeMaintenanceDaemonForMainDb is called in _PG_Init + * at which stage we are not in a transaction or have databaseOid + */ +void +InitializeMaintenanceDaemonForMainDb(void) +{ + if (strcmp(MainDb, "") == 0) + { + elog(LOG, "There is no designated Main database."); + return; + } + + BackgroundWorker worker; + + memset(&worker, 0, sizeof(worker)); + + + strcpy_s(worker.bgw_name, sizeof(worker.bgw_name), + "Citus Maintenance Daemon for Main DB"); + + /* request ability to connect to target database */ + worker.bgw_flags = BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION; + + /* + * No point in getting started before able to run query, but we do + * want to get started on Hot-Standby. + */ + worker.bgw_start_time = BgWorkerStart_ConsistentState; + + /* Restart after a bit after errors, but don't bog the system. */ + worker.bgw_restart_time = 5; + strcpy_s(worker.bgw_library_name, + sizeof(worker.bgw_library_name), "citus"); + strcpy_s(worker.bgw_function_name, sizeof(worker.bgw_library_name), + "CitusMaintenanceDaemonMain"); + + worker.bgw_main_arg = (Datum) 0; + + RegisterBackgroundWorker(&worker); +} + + /* * InitializeMaintenanceDaemonBackend, called at backend start and * configuration changes, is responsible for starting a per-database @@ -148,31 +227,20 @@ void InitializeMaintenanceDaemonBackend(void) { Oid extensionOwner = CitusExtensionOwner(); - bool found; + bool found = false; LWLockAcquire(&MaintenanceDaemonControl->lock, LW_EXCLUSIVE); - MaintenanceDaemonDBData *dbData = (MaintenanceDaemonDBData *) hash_search( - MaintenanceDaemonDBHash, - &MyDatabaseId, - HASH_ENTER_NULL, - &found); + MaintenanceDaemonDBData *dbData = GetMaintenanceDaemonDBHashEntry(MyDatabaseId, + &found); if (dbData == NULL) { WarnMaintenanceDaemonNotStarted(); LWLockRelease(&MaintenanceDaemonControl->lock); - return; } - if (!found) - { - /* ensure the values in MaintenanceDaemonDBData are zero */ - memset(((char *) dbData) + sizeof(Oid), 0, - sizeof(MaintenanceDaemonDBData) - sizeof(Oid)); - } - if (IsMaintenanceDaemon) { /* @@ -271,66 +339,97 @@ WarnMaintenanceDaemonNotStarted(void) /* - * CitusMaintenanceDaemonMain is the maintenance daemon's main routine, it'll - * be started by the background worker infrastructure. If it errors out, - * it'll be restarted after a few seconds. + * ConnectToDatabase connects to the database for the given databaseOid. + * if databaseOid is 0, connects to MainDb and then creates a hash entry. + * If a hash entry cannot be created for MainDb it exits the process requesting a restart. + * However for regular databases, it exits without requesting a restart since another + * subsequent backend is expected to start the Maintenance Daemon. + * If the found hash entry has a valid workerPid, it exits + * without requesting a restart since there is already a daemon running. */ -void -CitusMaintenanceDaemonMain(Datum main_arg) +static MaintenanceDaemonDBData * +ConnectToDatabase(Oid databaseOid) { - Oid databaseOid = DatumGetObjectId(main_arg); - TimestampTz nextStatsCollectionTime USED_WITH_LIBCURL_ONLY = - TimestampTzPlusMilliseconds(GetCurrentTimestamp(), 60 * 1000); - bool retryStatsCollection USED_WITH_LIBCURL_ONLY = false; - TimestampTz lastRecoveryTime = 0; - TimestampTz lastShardCleanTime = 0; - TimestampTz lastStatStatementsPurgeTime = 0; - TimestampTz nextMetadataSyncTime = 0; + MaintenanceDaemonDBData *myDbData = NULL; - /* state kept for the background tasks queue monitor */ - TimestampTz lastBackgroundTaskQueueCheck = GetCurrentTimestamp(); - BackgroundWorkerHandle *backgroundTasksQueueBgwHandle = NULL; - bool backgroundTasksQueueWarnedForLock = false; - /* - * We do metadata sync in a separate background worker. We need its - * handle to be able to check its status. - */ - BackgroundWorkerHandle *metadataSyncBgwHandle = NULL; + bool isMainDb = false; - /* - * Look up this worker's configuration. - */ LWLockAcquire(&MaintenanceDaemonControl->lock, LW_EXCLUSIVE); - MaintenanceDaemonDBData *myDbData = (MaintenanceDaemonDBData *) - hash_search(MaintenanceDaemonDBHash, &databaseOid, - HASH_FIND, NULL); - if (!myDbData) - { - /* - * When the database crashes, background workers are restarted, but - * the state in shared memory is lost. In that case, we exit and - * wait for a session to call InitializeMaintenanceDaemonBackend - * to properly add it to the hash. - */ - proc_exit(0); + if (databaseOid == 0) + { + char *databaseName = MainDb; + + /* + * Since we cannot query databaseOid without initializing Postgres + * first, connect to the database by name. + */ + BackgroundWorkerInitializeConnection(databaseName, NULL, 0); + + /* + * Now we have a valid MyDatabaseId. + * Insert the hash entry for the database to the Maintenance Deamon Hash. + */ + bool found = false; + + myDbData = GetMaintenanceDaemonDBHashEntry(MyDatabaseId, &found); + + if (!myDbData) + { + /* + * If an entry cannot be created, + * return code of 1 requests worker restart + * Since BackgroundWorker for the MainDb is only registered + * once during server startup, we need to retry. + */ + proc_exit(1); + } + + if (found && myDbData->workerPid != 0) + { + /* Another maintenance daemon is running.*/ + + proc_exit(0); + } + + databaseOid = MyDatabaseId; + myDbData->userOid = GetSessionUserId(); + isMainDb = true; + } + else + { + myDbData = (MaintenanceDaemonDBData *) + hash_search(MaintenanceDaemonDBHash, &databaseOid, + HASH_FIND, NULL); + + if (!myDbData) + { + /* + * When the database crashes, background workers are restarted, but + * the state in shared memory is lost. In that case, we exit and + * wait for a session to call InitializeMaintenanceDaemonBackend + * to properly add it to the hash. + */ + + proc_exit(0); + } + + if (myDbData->workerPid != 0) + { + /* + * Another maintenance daemon is running. This usually happens because + * postgres restarts the daemon after an non-zero exit, and + * InitializeMaintenanceDaemonBackend started one before postgres did. + * In that case, the first one stays and the last one exits. + */ + + proc_exit(0); + } } - if (myDbData->workerPid != 0) - { - /* - * Another maintenance daemon is running. This usually happens because - * postgres restarts the daemon after an non-zero exit, and - * InitializeMaintenanceDaemonBackend started one before postgres did. - * In that case, the first one stays and the last one exits. - */ - - proc_exit(0); - } - - before_shmem_exit(MaintenanceDaemonShmemExit, main_arg); + before_shmem_exit(MaintenanceDaemonShmemExit, ObjectIdGetDatum(databaseOid)); /* * Signal that I am the maintenance daemon now. @@ -356,25 +455,55 @@ CitusMaintenanceDaemonMain(Datum main_arg) LWLockRelease(&MaintenanceDaemonControl->lock); - /* - * Setup error context so log messages can be properly attributed. Some of - * them otherwise sound like they might be from a normal user connection. - * Do so before setting up signals etc, so we never exit without the - * context setup. - */ - ErrorContextCallback errorCallback = { 0 }; memset(&errorCallback, 0, sizeof(errorCallback)); errorCallback.callback = MaintenanceDaemonErrorContext; errorCallback.arg = (void *) myDbData; errorCallback.previous = error_context_stack; error_context_stack = &errorCallback; - elog(LOG, "starting maintenance daemon on database %u user %u", databaseOid, myDbData->userOid); - /* connect to database, after that we can actually access catalogs */ - BackgroundWorkerInitializeConnectionByOid(databaseOid, myDbData->userOid, 0); + if (!isMainDb) + { + /* connect to database, after that we can actually access catalogs */ + BackgroundWorkerInitializeConnectionByOid(databaseOid, myDbData->userOid, 0); + } + + return myDbData; +} + + +/* + * CitusMaintenanceDaemonMain is the maintenance daemon's main routine, it'll + * be started by the background worker infrastructure. If it errors out, + * it'll be restarted after a few seconds. + */ +void +CitusMaintenanceDaemonMain(Datum main_arg) +{ + Oid databaseOid = DatumGetObjectId(main_arg); + TimestampTz nextStatsCollectionTime USED_WITH_LIBCURL_ONLY = + TimestampTzPlusMilliseconds(GetCurrentTimestamp(), 60 * 1000); + bool retryStatsCollection USED_WITH_LIBCURL_ONLY = false; + TimestampTz lastRecoveryTime = 0; + TimestampTz lastShardCleanTime = 0; + TimestampTz lastStatStatementsPurgeTime = 0; + TimestampTz nextMetadataSyncTime = 0; + + /* state kept for the background tasks queue monitor */ + TimestampTz lastBackgroundTaskQueueCheck = GetCurrentTimestamp(); + BackgroundWorkerHandle *backgroundTasksQueueBgwHandle = NULL; + bool backgroundTasksQueueWarnedForLock = false; + + + /* + * We do metadata sync in a separate background worker. We need its + * handle to be able to check its status. + */ + BackgroundWorkerHandle *metadataSyncBgwHandle = NULL; + + MaintenanceDaemonDBData *myDbData = ConnectToDatabase(databaseOid); /* make worker recognizable in pg_stat_activity */ pgstat_report_appname("Citus Maintenance Daemon"); @@ -383,7 +512,7 @@ CitusMaintenanceDaemonMain(Datum main_arg) * Terminate orphaned metadata sync daemons spawned from previously terminated * or crashed maintenanced instances. */ - SignalMetadataSyncDaemon(databaseOid, SIGTERM); + SignalMetadataSyncDaemon(MyDatabaseId, SIGTERM); /* enter main loop */ while (!got_SIGTERM) @@ -945,7 +1074,7 @@ MaintenanceDaemonShmemExit(int code, Datum arg) } -/* MaintenanceDaemonSigTermHandler calls proc_exit(0) */ +/* MaintenanceDaemonSigTermHandler sets the got_SIGTERM flag.*/ static void MaintenanceDaemonSigTermHandler(SIGNAL_ARGS) { diff --git a/src/include/distributed/commands.h b/src/include/distributed/commands.h index e14e60e90..e4a624ddf 100644 --- a/src/include/distributed/commands.h +++ b/src/include/distributed/commands.h @@ -237,9 +237,17 @@ extern List * CreateDatabaseStmtObjectAddress(Node *node, bool missing_ok, bool extern List * PreprocessAlterDatabaseSetStmt(Node *node, const char *queryString, ProcessUtilityContext processUtilityContext); +extern List * PreprocessCreateDatabaseStmt(Node *node, const char *queryString, + ProcessUtilityContext processUtilityContext); extern List * PostprocessCreateDatabaseStmt(Node *node, const char *queryString); extern List * PreprocessDropDatabaseStmt(Node *node, const char *queryString, ProcessUtilityContext processUtilityContext); +extern List * DropDatabaseStmtObjectAddress(Node *node, bool missing_ok, bool + isPostprocess); +extern List * CreateDatabaseStmtObjectAddress(Node *node, bool missing_ok, bool + isPostprocess); +extern List * GenerateCreateDatabaseCommandList(void); + extern List * PreprocessAlterDatabaseRenameStmt(Node *node, const char *queryString, ProcessUtilityContext @@ -515,6 +523,7 @@ extern List * RenameRoleStmtObjectAddress(Node *stmt, bool missing_ok, bool extern void UnmarkRolesDistributed(List *roles); extern List * FilterDistributedRoles(List *roles); +extern void EnsureSequentialModeForRoleDDL(void); /* schema.c - forward declarations */ extern List * PostprocessCreateSchemaStmt(Node *node, const char *queryString); diff --git a/src/include/distributed/commands/utility_hook.h b/src/include/distributed/commands/utility_hook.h index 9ae57b49a..1790eb468 100644 --- a/src/include/distributed/commands/utility_hook.h +++ b/src/include/distributed/commands/utility_hook.h @@ -79,7 +79,7 @@ typedef struct DDLJob extern ProcessUtility_hook_type PrevProcessUtility; -extern void multi_ProcessUtility(PlannedStmt *pstmt, const char *queryString, +extern void citus_ProcessUtility(PlannedStmt *pstmt, const char *queryString, bool readOnlyTree, ProcessUtilityContext context, ParamListInfo params, struct QueryEnvironment *queryEnv, DestReceiver *dest, @@ -94,6 +94,7 @@ extern void ProcessUtilityParseTree(Node *node, const char *queryString, extern void MarkInvalidateForeignKeyGraph(void); extern void InvalidateForeignKeyGraphForDDL(void); extern List * DDLTaskList(Oid relationId, const char *commandString); +extern List * NontransactionalNodeDDLTask(TargetWorkerSet targets, List *commands); extern List * NodeDDLTaskList(TargetWorkerSet targets, List *commands); extern bool AlterTableInProgress(void); extern bool DropSchemaOrDBInProgress(void); diff --git a/src/include/distributed/maintenanced.h b/src/include/distributed/maintenanced.h index de1e68883..07387a7fd 100644 --- a/src/include/distributed/maintenanced.h +++ b/src/include/distributed/maintenanced.h @@ -20,6 +20,7 @@ /* config variable for */ extern double DistributedDeadlockDetectionTimeoutFactor; +extern char *MainDb; extern void StopMaintenanceDaemon(Oid databaseId); extern void TriggerNodeMetadataSync(Oid databaseId); @@ -27,6 +28,7 @@ extern void InitializeMaintenanceDaemon(void); extern size_t MaintenanceDaemonShmemSize(void); extern void MaintenanceDaemonShmemInit(void); extern void InitializeMaintenanceDaemonBackend(void); +extern void InitializeMaintenanceDaemonForMainDb(void); extern bool LockCitusExtension(void); extern PGDLLEXPORT void CitusMaintenanceDaemonMain(Datum main_arg); diff --git a/src/include/distributed/metadata/distobject.h b/src/include/distributed/metadata/distobject.h index ba984091c..86fada5f7 100644 --- a/src/include/distributed/metadata/distobject.h +++ b/src/include/distributed/metadata/distobject.h @@ -27,6 +27,7 @@ extern void MarkObjectDistributed(const ObjectAddress *distAddress); extern void MarkObjectDistributedViaSuperUser(const ObjectAddress *distAddress); extern void MarkObjectDistributedLocally(const ObjectAddress *distAddress); extern void UnmarkObjectDistributed(const ObjectAddress *address); +extern void UnmarkRolesAndDatabaseDistributed(Node *node); extern bool IsTableOwnedByExtension(Oid relationId); extern bool ObjectAddressDependsOnExtension(const ObjectAddress *target); extern bool IsAnyObjectAddressOwnedByExtension(const List *targets, diff --git a/src/include/distributed/multi_physical_planner.h b/src/include/distributed/multi_physical_planner.h index b7acc0574..35d83eb33 100644 --- a/src/include/distributed/multi_physical_planner.h +++ b/src/include/distributed/multi_physical_planner.h @@ -329,7 +329,7 @@ typedef struct Task /* * Vacuum, create/drop/reindex concurrently cannot be executed in a transaction. */ - bool cannotBeExecutedInTransction; + bool cannotBeExecutedInTransaction; Const *partitionKeyValue; int colocationId; diff --git a/src/test/regress/bin/normalize.sed b/src/test/regress/bin/normalize.sed index efa9e310f..1d293e964 100644 --- a/src/test/regress/bin/normalize.sed +++ b/src/test/regress/bin/normalize.sed @@ -222,7 +222,7 @@ s/(CONTEXT: PL\/pgSQL function .* line )([0-9]+)/\1XX/g s/^(PL\/pgSQL function .* line) [0-9]+ (.*)/\1 XX \2/g # normalize a test difference in multi_move_mx -s/ connection to server at "\w+" \(127\.0\.0\.1\), port [0-9]+ failed://g +s/ connection to server at "\w+" (\(127\.0\.0\.1\)|\(::1\)), port [0-9]+ failed://g # normalize differences in tablespace of new index s/pg14\.idx.*/pg14\.xxxxx/g diff --git a/src/test/regress/citus_tests/common.py b/src/test/regress/citus_tests/common.py index 907102482..53c9c7944 100644 --- a/src/test/regress/citus_tests/common.py +++ b/src/test/regress/citus_tests/common.py @@ -453,6 +453,9 @@ def cleanup_test_leftovers(nodes): for node in nodes: node.cleanup_schemas() + for node in nodes: + node.cleanup_databases() + for node in nodes: node.cleanup_users() @@ -753,6 +756,7 @@ class Postgres(QueryRunner): self.subscriptions = set() self.publications = set() self.replication_slots = set() + self.databases = set() self.schemas = set() self.users = set() @@ -993,6 +997,10 @@ class Postgres(QueryRunner): args = sql.SQL("") self.sql(sql.SQL("CREATE USER {} {}").format(sql.Identifier(name), args)) + def create_database(self, name): + self.databases.add(name) + self.sql(sql.SQL("CREATE DATABASE {}").format(sql.Identifier(name))) + def create_schema(self, name): self.schemas.add(name) self.sql(sql.SQL("CREATE SCHEMA {}").format(sql.Identifier(name))) @@ -1020,6 +1028,12 @@ class Postgres(QueryRunner): for user in self.users: self.sql(sql.SQL("DROP USER IF EXISTS {}").format(sql.Identifier(user))) + def cleanup_databases(self): + for database in self.databases: + self.sql( + sql.SQL("DROP DATABASE IF EXISTS {}").format(sql.Identifier(database)) + ) + def cleanup_schemas(self): for schema in self.schemas: self.sql( diff --git a/src/test/regress/citus_tests/run_test.py b/src/test/regress/citus_tests/run_test.py index 6528834ae..90c16b04e 100755 --- a/src/test/regress/citus_tests/run_test.py +++ b/src/test/regress/citus_tests/run_test.py @@ -151,6 +151,8 @@ DEPS = { ], worker_count=6, ), + "create_drop_database_propagation": TestDeps("minimal_schedule"), + "create_drop_database_propagation_pg15": TestDeps("minimal_schedule"), "function_propagation": TestDeps("minimal_schedule"), "citus_shards": TestDeps("minimal_schedule"), "grant_on_foreign_server_propagation": TestDeps("minimal_schedule"), diff --git a/src/test/regress/citus_tests/test/test_maintenancedeamon.py b/src/test/regress/citus_tests/test/test_maintenancedeamon.py new file mode 100644 index 000000000..3f6cb501e --- /dev/null +++ b/src/test/regress/citus_tests/test/test_maintenancedeamon.py @@ -0,0 +1,74 @@ +# This test checks that once citus.main_db is set and the +# server is restarted. A Citus Maintenance Daemon for the main_db +# is launched. This should happen even if there is no query run +# in main_db yet. +import time + + +def wait_until_maintenance_deamons_start(deamoncount, cluster): + i = 0 + n = 0 + + while i < 10: + i += 1 + n = cluster.coordinator.sql_value( + "SELECT count(*) FROM pg_stat_activity WHERE application_name = 'Citus Maintenance Daemon';" + ) + + if n == deamoncount: + break + + time.sleep(0.1) + + assert n == deamoncount + + +def test_set_maindb(cluster_factory): + cluster = cluster_factory(0) + + # Test that once citus.main_db is set to a database name + # there are two maintenance deamons running upon restart. + # One maintenance deamon for the database of the current connection + # and one for the citus.main_db. + cluster.coordinator.create_database("mymaindb") + cluster.coordinator.configure("citus.main_db='mymaindb'") + cluster.coordinator.restart() + + assert cluster.coordinator.sql_value("SHOW citus.main_db;") == "mymaindb" + + wait_until_maintenance_deamons_start(2, cluster) + + assert ( + cluster.coordinator.sql_value( + "SELECT count(*) FROM pg_stat_activity WHERE application_name = 'Citus Maintenance Daemon' AND datname='mymaindb';" + ) + == 1 + ) + + # Test that once citus.main_db is set to empty string + # there is only one maintenance deamon for the database + # of the current connection. + cluster.coordinator.configure("citus.main_db=''") + cluster.coordinator.restart() + assert cluster.coordinator.sql_value("SHOW citus.main_db;") == "" + + wait_until_maintenance_deamons_start(1, cluster) + + # Test that after citus.main_db is dropped. The maintenance + # deamon for this database is terminated. + cluster.coordinator.configure("citus.main_db='mymaindb'") + cluster.coordinator.restart() + assert cluster.coordinator.sql_value("SHOW citus.main_db;") == "mymaindb" + + wait_until_maintenance_deamons_start(2, cluster) + + cluster.coordinator.sql("DROP DATABASE mymaindb;") + + wait_until_maintenance_deamons_start(1, cluster) + + assert ( + cluster.coordinator.sql_value( + "SELECT count(*) FROM pg_stat_activity WHERE application_name = 'Citus Maintenance Daemon' AND datname='mymaindb';" + ) + == 0 + ) diff --git a/src/test/regress/expected/columnar_create.out b/src/test/regress/expected/columnar_create.out index 73b891177..a134fd063 100644 --- a/src/test/regress/expected/columnar_create.out +++ b/src/test/regress/expected/columnar_create.out @@ -178,32 +178,31 @@ SELECT columnar_test_helpers.columnar_metadata_has_storage_id(:columnar_table_1_ CREATE TEMPORARY TABLE columnar_temp(i int) USING columnar; -- reserve some chunks and a stripe INSERT INTO columnar_temp SELECT i FROM generate_series(1,5) i; -SELECT columnar.get_storage_id(oid) AS columnar_temp_storage_id -FROM pg_class WHERE relname='columnar_temp' \gset -SELECT pg_backend_pid() AS val INTO old_backend_pid; +SELECT columnar.get_storage_id(oid) as oid INTO columnar_temp_storage_id +FROM pg_class WHERE relname='columnar_temp'; \c - - - :master_port SET search_path TO columnar_create; --- wait until old backend to expire to make sure that temp table cleanup is complete -SELECT columnar_test_helpers.pg_waitpid(val) FROM old_backend_pid; - pg_waitpid ---------------------------------------------------------------------- +-- wait until temporary table and its metadata is removed +DO $$ +DECLARE + loop_wait_count integer := 0; +BEGIN + WHILE ( + (SELECT COUNT(*) > 0 FROM pg_class WHERE relname='columnar_temp') OR + (SELECT columnar_test_helpers.columnar_metadata_has_storage_id(oid) FROM columnar_temp_storage_id) + ) + LOOP + IF loop_wait_count > 1000 THEN + RAISE EXCEPTION 'Timeout while waiting for temporary table to be dropped'; + END IF; -(1 row) - -DROP TABLE old_backend_pid; --- show that temporary table itself and its metadata is removed -SELECT COUNT(*)=0 FROM pg_class WHERE relname='columnar_temp'; - ?column? ---------------------------------------------------------------------- - t -(1 row) - -SELECT columnar_test_helpers.columnar_metadata_has_storage_id(:columnar_temp_storage_id); - columnar_metadata_has_storage_id ---------------------------------------------------------------------- - f -(1 row) + PERFORM pg_sleep(0.001); + loop_wait_count := loop_wait_count + 1; + END LOOP; +END; +$$ language plpgsql; +DROP TABLE columnar_temp_storage_id; -- connect to another session and create a temp table with same name CREATE TEMPORARY TABLE columnar_temp(i int) USING columnar; -- reserve some chunks and a stripe diff --git a/src/test/regress/expected/create_drop_database_propagation.out b/src/test/regress/expected/create_drop_database_propagation.out index 37829a6ee..9489664eb 100644 --- a/src/test/regress/expected/create_drop_database_propagation.out +++ b/src/test/regress/expected/create_drop_database_propagation.out @@ -1,3 +1,6 @@ +-- test for create/drop database propagation +-- This test is only executes for Postgres 14 +-- For postgres 15 tests, pg15_create_drop_database_propagation.sql is used \set create_drop_db_tablespace :abs_srcdir '/tmp_check/ts3' CREATE TABLESPACE create_drop_db_tablespace LOCATION :'create_drop_db_tablespace'; \c - - - :worker_1_port @@ -10,113 +13,261 @@ CREATE TABLESPACE create_drop_db_tablespace LOCATION :'create_drop_db_tablespace create user create_drop_db_test_user; set citus.enable_create_database_propagation=on; CREATE DATABASE mydatabase - WITH TEMPLATE = 'template0' - OWNER = create_drop_db_test_user - CONNECTION LIMIT = 10 + WITH OWNER = create_drop_db_test_user ENCODING = 'UTF8' - LC_COLLATE = 'C' - LC_CTYPE = 'C' + CONNECTION LIMIT = 10 TABLESPACE = create_drop_db_tablespace ALLOW_CONNECTIONS = true IS_TEMPLATE = false; -SELECT pd.datname, pd.encoding, -pd.datistemplate, pd.datallowconn, pd.datconnlimit, -pd.datcollate , pd. datctype , pd.datacl, rolname AS database_owner, -pa.rolname AS database_owner, pt.spcname AS tablespace -FROM pg_database pd -JOIN pg_authid pa ON pd.datdba = pa.oid -join pg_tablespace pt on pd.dattablespace = pt.oid -WHERE datname = 'mydatabase'; - datname | encoding | datistemplate | datallowconn | datconnlimit | datcollate | datctype | datacl | database_owner | database_owner | tablespace +SELECT result from run_command_on_all_nodes( + $$ + SELECT jsonb_agg(to_jsonb(q2.*)) FROM ( + SELECT pd.datname, pg_encoding_to_char(pd.encoding) as encoding, + pd.datistemplate, pd.datallowconn, pd.datconnlimit, + pd.datcollate , pd. datctype , pd.datacl, + pa.rolname AS database_owner, pt.spcname AS tablespace + FROM pg_database pd + JOIN pg_authid pa ON pd.datdba = pa.oid + join pg_tablespace pt on pd.dattablespace = pt.oid + WHERE datname = 'mydatabase' + ) q2 + $$ +) ORDER BY result; + result --------------------------------------------------------------------- - mydatabase | 6 | f | t | 10 | C | C | | create_drop_db_test_user | create_drop_db_test_user | create_drop_db_tablespace -(1 row) + [{"datacl": null, "datname": "mydatabase", "datctype": "C", "encoding": "UTF8", "datcollate": "C", "tablespace": "create_drop_db_tablespace", "datallowconn": true, "datconnlimit": 10, "datistemplate": false, "database_owner": "create_drop_db_test_user"}] + [{"datacl": null, "datname": "mydatabase", "datctype": "C", "encoding": "UTF8", "datcollate": "C", "tablespace": "create_drop_db_tablespace", "datallowconn": true, "datconnlimit": 10, "datistemplate": false, "database_owner": "create_drop_db_test_user"}] + [{"datacl": null, "datname": "mydatabase", "datctype": "C", "encoding": "UTF8", "datcollate": "C", "tablespace": "create_drop_db_tablespace", "datallowconn": true, "datconnlimit": 10, "datistemplate": false, "database_owner": "create_drop_db_test_user"}] +(3 rows) -\c - - - :worker_1_port -SELECT pd.datname, pd.encoding, -pd.datistemplate, pd.datallowconn, pd.datconnlimit, -pd.datcollate , pd. datctype , pd.datacl, rolname AS database_owner, -pa.rolname AS database_owner, pt.spcname AS tablespace -FROM pg_database pd -JOIN pg_authid pa ON pd.datdba = pa.oid -join pg_tablespace pt on pd.dattablespace = pt.oid -WHERE datname = 'mydatabase'; - datname | encoding | datistemplate | datallowconn | datconnlimit | datcollate | datctype | datacl | database_owner | database_owner | tablespace ---------------------------------------------------------------------- - mydatabase | 6 | f | t | 10 | C | C | | create_drop_db_test_user | create_drop_db_test_user | create_drop_db_tablespace -(1 row) - -\c - - - :worker_2_port -SELECT pd.datname, pd.encoding, -pd.datistemplate, pd.datallowconn, pd.datconnlimit, -pd.datcollate , pd. datctype , pd.datacl, rolname AS database_owner, -pa.rolname AS database_owner, pt.spcname AS tablespace -FROM pg_database pd -JOIN pg_authid pa ON pd.datdba = pa.oid -join pg_tablespace pt on pd.dattablespace = pt.oid -WHERE datname = 'mydatabase'; - datname | encoding | datistemplate | datallowconn | datconnlimit | datcollate | datctype | datacl | database_owner | database_owner | tablespace ---------------------------------------------------------------------- - mydatabase | 6 | f | t | 10 | C | C | | create_drop_db_test_user | create_drop_db_test_user | create_drop_db_tablespace -(1 row) - -\c - - - :master_port -set citus.enable_create_database_propagation=on; drop database mydatabase; -SELECT pd.datname, pd.encoding, -pd.datistemplate, pd.datallowconn, pd.datconnlimit, -pd.datcollate , pd. datctype , pd.datacl, rolname AS database_owner, -pa.rolname AS database_owner, pt.spcname AS tablespace -FROM pg_database pd -JOIN pg_authid pa ON pd.datdba = pa.oid -join pg_tablespace pt on pd.dattablespace = pt.oid -WHERE datname = 'mydatabase'; - datname | encoding | datistemplate | datallowconn | datconnlimit | datcollate | datctype | datacl | database_owner | database_owner | tablespace +SELECT result from run_command_on_all_nodes( + $$ + SELECT jsonb_agg(to_jsonb(q2.*)) FROM ( + SELECT pd.datname, pg_encoding_to_char(pd.encoding) as encoding, + pd.datistemplate, pd.datallowconn, pd.datconnlimit, + pd.datcollate , pd. datctype , pd.datacl, + pa.rolname AS database_owner, pt.spcname AS tablespace + FROM pg_database pd + JOIN pg_authid pa ON pd.datdba = pa.oid + join pg_tablespace pt on pd.dattablespace = pt.oid + WHERE datname = 'mydatabase' + ) q2 + $$ +) ORDER BY result; + result --------------------------------------------------------------------- -(0 rows) -\c - - - :worker_1_port -SELECT pd.datname, pd.encoding, -pd.datistemplate, pd.datallowconn, pd.datconnlimit, -pd.datcollate , pd. datctype , pd.datacl, rolname AS database_owner, -pa.rolname AS database_owner, pt.spcname AS tablespace -FROM pg_database pd -JOIN pg_authid pa ON pd.datdba = pa.oid -join pg_tablespace pt on pd.dattablespace = pt.oid -WHERE datname = 'mydatabase'; - datname | encoding | datistemplate | datallowconn | datconnlimit | datcollate | datctype | datacl | database_owner | database_owner | tablespace + + +(3 rows) + +-- test database syncing after node addition +select 1 from citus_remove_node('localhost', :worker_2_port); + ?column? --------------------------------------------------------------------- -(0 rows) + 1 +(1 row) -\c - - - :worker_2_port -SELECT pd.datname, pd.encoding, -pd.datistemplate, pd.datallowconn, pd.datconnlimit, -pd.datcollate , pd. datctype , pd.datacl, rolname AS database_owner, -pa.rolname AS database_owner, pt.spcname AS tablespace -FROM pg_database pd -JOIN pg_authid pa ON pd.datdba = pa.oid -join pg_tablespace pt on pd.dattablespace = pt.oid -WHERE datname = 'mydatabase'; - datname | encoding | datistemplate | datallowconn | datconnlimit | datcollate | datctype | datacl | database_owner | database_owner | tablespace +--test with is_template true and allow connections false +CREATE DATABASE mydatabase + OWNER = create_drop_db_test_user + CONNECTION LIMIT = 10 + ENCODING = 'UTF8' + TABLESPACE = create_drop_db_tablespace + ALLOW_CONNECTIONS = false + IS_TEMPLATE = false; +SELECT result from run_command_on_all_nodes( + $$ + SELECT jsonb_agg(to_jsonb(q2.*)) FROM ( + SELECT pd.datname, pg_encoding_to_char(pd.encoding) as encoding, + pd.datistemplate, pd.datallowconn, pd.datconnlimit, + pd.datcollate , pd. datctype , pd.datacl, + pa.rolname AS database_owner, pt.spcname AS tablespace + FROM pg_database pd + JOIN pg_authid pa ON pd.datdba = pa.oid + join pg_tablespace pt on pd.dattablespace = pt.oid + WHERE datname = 'mydatabase' + ) q2 + $$ +) ORDER BY result; + result --------------------------------------------------------------------- -(0 rows) + [{"datacl": null, "datname": "mydatabase", "datctype": "C", "encoding": "UTF8", "datcollate": "C", "tablespace": "create_drop_db_tablespace", "datallowconn": false, "datconnlimit": 10, "datistemplate": false, "database_owner": "create_drop_db_test_user"}] + [{"datacl": null, "datname": "mydatabase", "datctype": "C", "encoding": "UTF8", "datcollate": "C", "tablespace": "create_drop_db_tablespace", "datallowconn": false, "datconnlimit": 10, "datistemplate": false, "database_owner": "create_drop_db_test_user"}] +(2 rows) + +select 1 from citus_add_node('localhost', :worker_2_port); + ?column? +--------------------------------------------------------------------- + 1 +(1 row) + +SELECT result from run_command_on_all_nodes( + $$ + SELECT jsonb_agg(to_jsonb(q2.*)) FROM ( + SELECT pd.datname, pg_encoding_to_char(pd.encoding) as encoding, + pd.datistemplate, pd.datallowconn, pd.datconnlimit, + pd.datcollate , pd. datctype , pd.datacl, + pa.rolname AS database_owner, pt.spcname AS tablespace + FROM pg_database pd + JOIN pg_authid pa ON pd.datdba = pa.oid + join pg_tablespace pt on pd.dattablespace = pt.oid + WHERE datname = 'mydatabase' + ) q2 + $$ +) ORDER BY result; + result +--------------------------------------------------------------------- + [{"datacl": null, "datname": "mydatabase", "datctype": "C", "encoding": "UTF8", "datcollate": "C", "tablespace": "create_drop_db_tablespace", "datallowconn": false, "datconnlimit": 10, "datistemplate": false, "database_owner": "create_drop_db_test_user"}] + [{"datacl": null, "datname": "mydatabase", "datctype": "C", "encoding": "UTF8", "datcollate": "C", "tablespace": "create_drop_db_tablespace", "datallowconn": false, "datconnlimit": 10, "datistemplate": false, "database_owner": "create_drop_db_test_user"}] + [{"datacl": null, "datname": "mydatabase", "datctype": "C", "encoding": "UTF8", "datcollate": "C", "tablespace": "create_drop_db_tablespace", "datallowconn": false, "datconnlimit": 10, "datistemplate": false, "database_owner": "create_drop_db_test_user"}] +(3 rows) + +SET citus.log_remote_commands = true; +set citus.grep_remote_commands = '%DROP DATABASE%'; +drop database mydatabase; +NOTICE: issuing DROP DATABASE mydatabase +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing DROP DATABASE mydatabase +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +SET citus.log_remote_commands = false; +SELECT result from run_command_on_all_nodes( + $$ + SELECT jsonb_agg(to_jsonb(q2.*)) FROM ( + SELECT pd.datname, pg_encoding_to_char(pd.encoding) as encoding, + pd.datistemplate, pd.datallowconn, pd.datconnlimit, + pd.datcollate , pd. datctype , pd.datacl, + pa.rolname AS database_owner, pt.spcname AS tablespace + FROM pg_database pd + JOIN pg_authid pa ON pd.datdba = pa.oid + join pg_tablespace pt on pd.dattablespace = pt.oid + WHERE datname = 'mydatabase' + ) q2 + $$ +) ORDER BY result; + result +--------------------------------------------------------------------- + + + +(3 rows) + +-- create a template database with all options set and allow connections false +CREATE DATABASE my_template_database + WITH OWNER = create_drop_db_test_user + ENCODING = 'UTF8' + TABLESPACE = create_drop_db_tablespace + ALLOW_CONNECTIONS = false + IS_TEMPLATE = true; +SET citus.log_remote_commands = false; +SELECT result from run_command_on_all_nodes( + $$ + SELECT jsonb_agg(to_jsonb(q2.*)) FROM ( + SELECT pd.datname, pg_encoding_to_char(pd.encoding) as encoding, + pd.datistemplate, pd.datallowconn, pd.datconnlimit, + pd.datcollate , pd. datctype , pd.datacl, + pa.rolname AS database_owner, pt.spcname AS tablespace + FROM pg_database pd + JOIN pg_authid pa ON pd.datdba = pa.oid + join pg_tablespace pt on pd.dattablespace = pt.oid + WHERE datname = 'my_template_database' + ) q2 + $$ +) ORDER BY result; + result +--------------------------------------------------------------------- + [{"datacl": null, "datname": "my_template_database", "datctype": "C", "encoding": "UTF8", "datcollate": "C", "tablespace": "create_drop_db_tablespace", "datallowconn": false, "datconnlimit": -1, "datistemplate": true, "database_owner": "create_drop_db_test_user"}] + [{"datacl": null, "datname": "my_template_database", "datctype": "C", "encoding": "UTF8", "datcollate": "C", "tablespace": "create_drop_db_tablespace", "datallowconn": false, "datconnlimit": -1, "datistemplate": true, "database_owner": "create_drop_db_test_user"}] + [{"datacl": null, "datname": "my_template_database", "datctype": "C", "encoding": "UTF8", "datcollate": "C", "tablespace": "create_drop_db_tablespace", "datallowconn": false, "datconnlimit": -1, "datistemplate": true, "database_owner": "create_drop_db_test_user"}] +(3 rows) + +--template databases could not be dropped so we need to change the template flag +SELECT result from run_command_on_all_nodes( + $$ + UPDATE pg_database SET datistemplate = false WHERE datname = 'my_template_database' + $$ +) ORDER BY result; + result +--------------------------------------------------------------------- + UPDATE 1 + UPDATE 1 + UPDATE 1 +(3 rows) + +SET citus.log_remote_commands = true; +set citus.grep_remote_commands = '%DROP DATABASE%'; +drop database my_template_database; +NOTICE: issuing DROP DATABASE my_template_database +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing DROP DATABASE my_template_database +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +SET citus.log_remote_commands = false; +SELECT result from run_command_on_all_nodes( + $$ + SELECT jsonb_agg(to_jsonb(q2.*)) FROM ( + SELECT pd.datname, pg_encoding_to_char(pd.encoding) as encoding, + pd.datistemplate, pd.datallowconn, pd.datconnlimit, + pd.datcollate , pd. datctype , pd.datacl, + pa.rolname AS database_owner, pt.spcname AS tablespace + FROM pg_database pd + JOIN pg_authid pa ON pd.datdba = pa.oid + join pg_tablespace pt on pd.dattablespace = pt.oid + WHERE datname = 'my_template_database' + ) q2 + $$ +) ORDER BY result; + result +--------------------------------------------------------------------- + + + +(3 rows) -\c - - - :master_port --tests for special characters in database name set citus.enable_create_database_propagation=on; SET citus.log_remote_commands = true; set citus.grep_remote_commands = '%CREATE DATABASE%'; create database "mydatabase#1'2"; -NOTICE: issuing SELECT pg_catalog.citus_internal_database_command('CREATE DATABASE "mydatabase#1''2"') +NOTICE: issuing CREATE DATABASE "mydatabase#1'2" DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing SELECT pg_catalog.citus_internal_database_command('CREATE DATABASE "mydatabase#1''2"') +NOTICE: issuing CREATE DATABASE "mydatabase#1'2" DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx set citus.grep_remote_commands = '%DROP DATABASE%'; drop database if exists "mydatabase#1'2"; -NOTICE: issuing SELECT pg_catalog.citus_internal_database_command('DROP DATABASE IF EXISTS "mydatabase#1''2"') +NOTICE: issuing DROP DATABASE IF EXISTS "mydatabase#1'2" DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing SELECT pg_catalog.citus_internal_database_command('DROP DATABASE IF EXISTS "mydatabase#1''2"') +NOTICE: issuing DROP DATABASE IF EXISTS "mydatabase#1'2" DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +--test for unsupported options +CREATE DATABASE mydatabase + with CONNECTION LIMIT = 10 + ENCODING = 'UTF8' + LC_CTYPE = 'C.UTF-8' + ALLOW_CONNECTIONS = false + IS_TEMPLATE = false; +ERROR: CREATE DATABASE option "lc_ctype" is not supported +CREATE DATABASE mydatabase + with CONNECTION LIMIT = 10 + ENCODING = 'UTF8' + LC_CTYPE = 'C.UTF-8' + ALLOW_CONNECTIONS = false + IS_TEMPLATE = false; +ERROR: CREATE DATABASE option "lc_ctype" is not supported +CREATE DATABASE mydatabase + with CONNECTION LIMIT = 10 + ENCODING = 'UTF8' + LC_COLLATE = 'C.UTF-8' + ALLOW_CONNECTIONS = false + IS_TEMPLATE = false; +ERROR: CREATE DATABASE option "lc_collate" is not supported +CREATE DATABASE mydatabase + with CONNECTION LIMIT = 10 + ENCODING = 'UTF8' + LOCALE = 'C.UTF-8' + ALLOW_CONNECTIONS = false + IS_TEMPLATE = false; +ERROR: CREATE DATABASE option "locale" is not supported --clean up resources created by this test drop tablespace create_drop_db_tablespace; \c - - - :worker_1_port diff --git a/src/test/regress/expected/create_drop_database_propagation_pg15.out b/src/test/regress/expected/create_drop_database_propagation_pg15.out new file mode 100644 index 000000000..bc6374803 --- /dev/null +++ b/src/test/regress/expected/create_drop_database_propagation_pg15.out @@ -0,0 +1,305 @@ +-- +-- PG15 +-- +SHOW server_version \gset +SELECT substring(:'server_version', '\d+')::int >= 15 AS server_version_ge_15 +\gset +\if :server_version_ge_15 +\else +\q +\endif +-- create/drop database for pg > 15 +\set create_drop_db_tablespace :abs_srcdir '/tmp_check/ts3' +CREATE TABLESPACE create_drop_db_tablespace LOCATION :'create_drop_db_tablespace'; +\c - - - :worker_1_port +\set create_drop_db_tablespace :abs_srcdir '/tmp_check/ts4' +CREATE TABLESPACE create_drop_db_tablespace LOCATION :'create_drop_db_tablespace'; +\c - - - :worker_2_port +\set create_drop_db_tablespace :abs_srcdir '/tmp_check/ts5' +CREATE TABLESPACE create_drop_db_tablespace LOCATION :'create_drop_db_tablespace'; +\c - - - :master_port +create user create_drop_db_test_user; +set citus.enable_create_database_propagation=on; +SET citus.log_remote_commands = true; +set citus.grep_remote_commands = '%CREATE DATABASE%'; +CREATE DATABASE mydatabase + WITH + OWNER = create_drop_db_test_user + CONNECTION LIMIT = 10 + ENCODING = 'UTF8' + TABLESPACE = create_drop_db_tablespace + ALLOW_CONNECTIONS = true + IS_TEMPLATE = false + OID = 966345; +NOTICE: issuing CREATE DATABASE mydatabase OWNER create_drop_db_test_user CONNECTION LIMIT 10 ENCODING 'UTF8' TABLESPACE create_drop_db_tablespace ALLOW_CONNECTIONS true IS_TEMPLATE false OID 966345 +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing CREATE DATABASE mydatabase OWNER create_drop_db_test_user CONNECTION LIMIT 10 ENCODING 'UTF8' TABLESPACE create_drop_db_tablespace ALLOW_CONNECTIONS true IS_TEMPLATE false OID 966345 +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +SET citus.log_remote_commands = false; +SELECT result from run_command_on_all_nodes( + $$ + SELECT jsonb_agg(to_jsonb(q2.*)) FROM ( + SELECT pd.datname, pg_encoding_to_char(pd.encoding) as encoding, + pd.datistemplate, pd.datallowconn, pd.datconnlimit, + pd.datcollate , pd. datctype , pd.datacl, + pa.rolname AS database_owner, pt.spcname AS tablespace + FROM pg_database pd + JOIN pg_authid pa ON pd.datdba = pa.oid + join pg_tablespace pt on pd.dattablespace = pt.oid + WHERE datname = 'mydatabase' + ) q2 + $$ +) ORDER BY result; + result +--------------------------------------------------------------------- + [{"datacl": null, "datname": "mydatabase", "datctype": "C", "encoding": "UTF8", "datcollate": "C", "tablespace": "create_drop_db_tablespace", "datallowconn": true, "datconnlimit": 10, "datistemplate": false, "database_owner": "create_drop_db_test_user"}] + [{"datacl": null, "datname": "mydatabase", "datctype": "C", "encoding": "UTF8", "datcollate": "C", "tablespace": "create_drop_db_tablespace", "datallowconn": true, "datconnlimit": 10, "datistemplate": false, "database_owner": "create_drop_db_test_user"}] + [{"datacl": null, "datname": "mydatabase", "datctype": "C", "encoding": "UTF8", "datcollate": "C", "tablespace": "create_drop_db_tablespace", "datallowconn": true, "datconnlimit": 10, "datistemplate": false, "database_owner": "create_drop_db_test_user"}] +(3 rows) + +SET citus.log_remote_commands = true; +set citus.grep_remote_commands = '%DROP DATABASE%'; +drop database mydatabase; +NOTICE: issuing DROP DATABASE mydatabase +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing DROP DATABASE mydatabase +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +SET citus.log_remote_commands = false; +SELECT result from run_command_on_all_nodes( + $$ + SELECT jsonb_agg(to_jsonb(q2.*)) FROM ( + SELECT pd.datname, pg_encoding_to_char(pd.encoding) as encoding, + pd.datistemplate, pd.datallowconn, pd.datconnlimit, + pd.datcollate , pd. datctype , pd.datacl, + pa.rolname AS database_owner, pt.spcname AS tablespace + FROM pg_database pd + JOIN pg_authid pa ON pd.datdba = pa.oid + join pg_tablespace pt on pd.dattablespace = pt.oid + WHERE datname = 'mydatabase' + ) q2 + $$ +) ORDER BY result; + result +--------------------------------------------------------------------- + + + +(3 rows) + +select citus_remove_node('localhost', :worker_2_port); + citus_remove_node +--------------------------------------------------------------------- + +(1 row) + +SET citus.log_remote_commands = true; +set citus.grep_remote_commands = '%CREATE DATABASE%'; +CREATE DATABASE mydatabase2 + WITH OWNER = create_drop_db_test_user + ENCODING = 'UTF8' + TABLESPACE = create_drop_db_tablespace + ALLOW_CONNECTIONS = true + IS_TEMPLATE = false + OID = 966345; +NOTICE: issuing CREATE DATABASE mydatabase2 OWNER create_drop_db_test_user ENCODING 'UTF8' TABLESPACE create_drop_db_tablespace ALLOW_CONNECTIONS true IS_TEMPLATE false OID 966345 +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +SET citus.log_remote_commands = false; +SELECT result from run_command_on_all_nodes( + $$ + SELECT jsonb_agg(to_jsonb(q2.*)) FROM ( + SELECT pd.datname, pg_encoding_to_char(pd.encoding) as encoding, + pd.datistemplate, pd.datallowconn, pd.datconnlimit, + pd.datcollate , pd. datctype , pd.datacl, + pa.rolname AS database_owner, pt.spcname AS tablespace + FROM pg_database pd + JOIN pg_authid pa ON pd.datdba = pa.oid + join pg_tablespace pt on pd.dattablespace = pt.oid + WHERE datname = 'mydatabase2' + ) q2 + $$ +) ORDER BY result; + result +--------------------------------------------------------------------- + [{"datacl": null, "datname": "mydatabase2", "datctype": "C", "encoding": "UTF8", "datcollate": "C", "tablespace": "create_drop_db_tablespace", "datallowconn": true, "datconnlimit": -1, "datistemplate": false, "database_owner": "create_drop_db_test_user"}] + [{"datacl": null, "datname": "mydatabase2", "datctype": "C", "encoding": "UTF8", "datcollate": "C", "tablespace": "create_drop_db_tablespace", "datallowconn": true, "datconnlimit": -1, "datistemplate": false, "database_owner": "create_drop_db_test_user"}] +(2 rows) + +select 1 from citus_add_node('localhost', :worker_2_port); + ?column? +--------------------------------------------------------------------- + 1 +(1 row) + +SELECT result from run_command_on_all_nodes( + $$ + SELECT jsonb_agg(to_jsonb(q2.*)) FROM ( + SELECT pd.datname, pg_encoding_to_char(pd.encoding) as encoding, + pd.datistemplate, pd.datallowconn, pd.datconnlimit, + pd.datcollate , pd. datctype , pd.datacl, + pa.rolname AS database_owner, pt.spcname AS tablespace + FROM pg_database pd + JOIN pg_authid pa ON pd.datdba = pa.oid + join pg_tablespace pt on pd.dattablespace = pt.oid + WHERE datname = 'mydatabase2' + ) q2 + $$ +) ORDER BY result; + result +--------------------------------------------------------------------- + [{"datacl": null, "datname": "mydatabase2", "datctype": "C", "encoding": "UTF8", "datcollate": "C", "tablespace": "create_drop_db_tablespace", "datallowconn": true, "datconnlimit": -1, "datistemplate": false, "database_owner": "create_drop_db_test_user"}] + [{"datacl": null, "datname": "mydatabase2", "datctype": "C", "encoding": "UTF8", "datcollate": "C", "tablespace": "create_drop_db_tablespace", "datallowconn": true, "datconnlimit": -1, "datistemplate": false, "database_owner": "create_drop_db_test_user"}] + [{"datacl": null, "datname": "mydatabase2", "datctype": "C", "encoding": "UTF8", "datcollate": "C", "tablespace": "create_drop_db_tablespace", "datallowconn": true, "datconnlimit": -1, "datistemplate": false, "database_owner": "create_drop_db_test_user"}] +(3 rows) + +SET citus.log_remote_commands = true; +set citus.grep_remote_commands = '%DROP DATABASE%'; +drop database mydatabase2; +NOTICE: issuing DROP DATABASE mydatabase2 +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing DROP DATABASE mydatabase2 +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +SET citus.log_remote_commands = false; +SELECT result from run_command_on_all_nodes( + $$ + SELECT jsonb_agg(to_jsonb(q2.*)) FROM ( + SELECT pd.datname, pg_encoding_to_char(pd.encoding) as encoding, + pd.datistemplate, pd.datallowconn, pd.datconnlimit, + pd.datcollate , pd. datctype , pd.datacl, + pa.rolname AS database_owner, pt.spcname AS tablespace + FROM pg_database pd + JOIN pg_authid pa ON pd.datdba = pa.oid + join pg_tablespace pt on pd.dattablespace = pt.oid + WHERE datname = 'mydatabase' + ) q2 + $$ +) ORDER BY result; + result +--------------------------------------------------------------------- + + + +(3 rows) + +SET citus.log_remote_commands = true; +set citus.grep_remote_commands = '%CREATE DATABASE%'; +-- create a template database with all options set and allow connections false +CREATE DATABASE my_template_database + WITH OWNER = create_drop_db_test_user + ENCODING = 'UTF8' + COLLATION_VERSION = '1.0' + TABLESPACE = create_drop_db_tablespace + ALLOW_CONNECTIONS = false + IS_TEMPLATE = true; +NOTICE: issuing CREATE DATABASE my_template_database OWNER create_drop_db_test_user ENCODING 'UTF8' COLLATION_VERSION '1.0' TABLESPACE create_drop_db_tablespace ALLOW_CONNECTIONS false IS_TEMPLATE true +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing CREATE DATABASE my_template_database OWNER create_drop_db_test_user ENCODING 'UTF8' COLLATION_VERSION '1.0' TABLESPACE create_drop_db_tablespace ALLOW_CONNECTIONS false IS_TEMPLATE true +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +SET citus.log_remote_commands = false; +SELECT result from run_command_on_all_nodes( + $$ + SELECT jsonb_agg(to_jsonb(q2.*)) FROM ( + SELECT pd.datname, pg_encoding_to_char(pd.encoding) as encoding, + pd.datistemplate, pd.datallowconn, pd.datconnlimit, + pd.datcollate , pd. datctype , pd.datacl, + pa.rolname AS database_owner, pt.spcname AS tablespace + FROM pg_database pd + JOIN pg_authid pa ON pd.datdba = pa.oid + join pg_tablespace pt on pd.dattablespace = pt.oid + WHERE datname = 'my_template_database' + ) q2 + $$ +) ORDER BY result; + result +--------------------------------------------------------------------- + [{"datacl": null, "datname": "my_template_database", "datctype": "C", "encoding": "UTF8", "datcollate": "C", "tablespace": "create_drop_db_tablespace", "datallowconn": false, "datconnlimit": -1, "datistemplate": true, "database_owner": "create_drop_db_test_user"}] + [{"datacl": null, "datname": "my_template_database", "datctype": "C", "encoding": "UTF8", "datcollate": "C", "tablespace": "create_drop_db_tablespace", "datallowconn": false, "datconnlimit": -1, "datistemplate": true, "database_owner": "create_drop_db_test_user"}] + [{"datacl": null, "datname": "my_template_database", "datctype": "C", "encoding": "UTF8", "datcollate": "C", "tablespace": "create_drop_db_tablespace", "datallowconn": false, "datconnlimit": -1, "datistemplate": true, "database_owner": "create_drop_db_test_user"}] +(3 rows) + +SET citus.log_remote_commands = false; +SELECT result from run_command_on_all_nodes( + $$ + SELECT jsonb_agg(to_jsonb(q2.*)) FROM ( + SELECT pd.datname, pg_encoding_to_char(pd.encoding) as encoding, + pd.datistemplate, pd.datallowconn, pd.datconnlimit, + pd.datcollate , pd. datctype , pd.datacl, + pa.rolname AS database_owner, pt.spcname AS tablespace + FROM pg_database pd + JOIN pg_authid pa ON pd.datdba = pa.oid + join pg_tablespace pt on pd.dattablespace = pt.oid + WHERE datname = 'my_template_database' + ) q2 + $$ +) ORDER BY result; + result +--------------------------------------------------------------------- + [{"datacl": null, "datname": "my_template_database", "datctype": "C", "encoding": "UTF8", "datcollate": "C", "tablespace": "create_drop_db_tablespace", "datallowconn": false, "datconnlimit": -1, "datistemplate": true, "database_owner": "create_drop_db_test_user"}] + [{"datacl": null, "datname": "my_template_database", "datctype": "C", "encoding": "UTF8", "datcollate": "C", "tablespace": "create_drop_db_tablespace", "datallowconn": false, "datconnlimit": -1, "datistemplate": true, "database_owner": "create_drop_db_test_user"}] + [{"datacl": null, "datname": "my_template_database", "datctype": "C", "encoding": "UTF8", "datcollate": "C", "tablespace": "create_drop_db_tablespace", "datallowconn": false, "datconnlimit": -1, "datistemplate": true, "database_owner": "create_drop_db_test_user"}] +(3 rows) + +SET citus.log_remote_commands = true; +--template databases could not be dropped so we need to change the template flag +SELECT result from run_command_on_all_nodes( + $$ + UPDATE pg_database SET datistemplate = false WHERE datname = 'my_template_database' + $$ +) ORDER BY result; + result +--------------------------------------------------------------------- + UPDATE 1 + UPDATE 1 + UPDATE 1 +(3 rows) + +set citus.grep_remote_commands = '%DROP DATABASE%'; +drop database my_template_database; +NOTICE: issuing DROP DATABASE my_template_database +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing DROP DATABASE my_template_database +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +SET citus.log_remote_commands = false; +SELECT result from run_command_on_all_nodes( + $$ + SELECT jsonb_agg(to_jsonb(q2.*)) FROM ( + SELECT pd.datname, pg_encoding_to_char(pd.encoding) as encoding, + pd.datistemplate, pd.datallowconn, pd.datconnlimit, + pd.datcollate , pd. datctype , pd.datacl, + pa.rolname AS database_owner, pt.spcname AS tablespace + FROM pg_database pd + JOIN pg_authid pa ON pd.datdba = pa.oid + join pg_tablespace pt on pd.dattablespace = pt.oid + WHERE datname = 'my_template_database' + ) q2 + $$ +) ORDER BY result; + result +--------------------------------------------------------------------- + + + +(3 rows) + +--tests for special characters in database name +set citus.enable_create_database_propagation=on; +SET citus.log_remote_commands = true; +set citus.grep_remote_commands = '%CREATE DATABASE%'; +create database "mydatabase#1'2"; +NOTICE: issuing CREATE DATABASE "mydatabase#1'2" +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing CREATE DATABASE "mydatabase#1'2" +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +set citus.grep_remote_commands = '%DROP DATABASE%'; +drop database if exists "mydatabase#1'2"; +NOTICE: issuing DROP DATABASE IF EXISTS "mydatabase#1'2" +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing DROP DATABASE IF EXISTS "mydatabase#1'2" +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +\c - - - :master_port +drop tablespace create_drop_db_tablespace; +\c - - - :worker_1_port +drop tablespace create_drop_db_tablespace; +\c - - - :worker_2_port +drop tablespace create_drop_db_tablespace; +\c - - - :master_port +drop user create_drop_db_test_user; diff --git a/src/test/regress/expected/create_drop_database_propagation_pg15_0.out b/src/test/regress/expected/create_drop_database_propagation_pg15_0.out new file mode 100644 index 000000000..b1ed9cc5b --- /dev/null +++ b/src/test/regress/expected/create_drop_database_propagation_pg15_0.out @@ -0,0 +1,9 @@ +-- +-- PG15 +-- +SHOW server_version \gset +SELECT substring(:'server_version', '\d+')::int >= 15 AS server_version_ge_15 +\gset +\if :server_version_ge_15 +\else +\q diff --git a/src/test/regress/expected/failure_create_index_concurrently.out b/src/test/regress/expected/failure_create_index_concurrently.out index a198ddc70..94d0f373d 100644 --- a/src/test/regress/expected/failure_create_index_concurrently.out +++ b/src/test/regress/expected/failure_create_index_concurrently.out @@ -26,8 +26,9 @@ SELECT citus.mitmproxy('conn.onQuery(query="CREATE").kill()'); (1 row) CREATE INDEX CONCURRENTLY idx_index_test ON index_test(id, value_1); -WARNING: CONCURRENTLY-enabled index commands can fail partially, leaving behind an INVALID index. -Use DROP INDEX CONCURRENTLY IF EXISTS to remove the invalid index. +WARNING: Commands that are not transaction-safe may result in partial failure, potentially leading to an inconsistent state. +If the problematic command is a CREATE operation, consider using the 'IF EXISTS' syntax to drop the object, +if applicable, and then reattempt the original command. ERROR: connection to the remote node localhost:xxxxx failed with the following error: connection not open SELECT citus.mitmproxy('conn.allow()'); mitmproxy @@ -59,8 +60,9 @@ SELECT citus.mitmproxy('conn.onQuery(query="CREATE").kill()'); (1 row) CREATE INDEX CONCURRENTLY idx_index_test ON index_test(id, value_1); -WARNING: CONCURRENTLY-enabled index commands can fail partially, leaving behind an INVALID index. -Use DROP INDEX CONCURRENTLY IF EXISTS to remove the invalid index. +WARNING: Commands that are not transaction-safe may result in partial failure, potentially leading to an inconsistent state. +If the problematic command is a CREATE operation, consider using the 'IF EXISTS' syntax to drop the object, +if applicable, and then reattempt the original command. ERROR: connection to the remote node localhost:xxxxx failed with the following error: connection not open SELECT citus.mitmproxy('conn.allow()'); mitmproxy @@ -86,8 +88,9 @@ SELECT citus.mitmproxy('conn.onQuery(query="CREATE").cancel(' || pg_backend_pid( (1 row) CREATE INDEX CONCURRENTLY idx_index_test ON index_test(id, value_1); -WARNING: CONCURRENTLY-enabled index commands can fail partially, leaving behind an INVALID index. -Use DROP INDEX CONCURRENTLY IF EXISTS to remove the invalid index. +WARNING: Commands that are not transaction-safe may result in partial failure, potentially leading to an inconsistent state. +If the problematic command is a CREATE operation, consider using the 'IF EXISTS' syntax to drop the object, +if applicable, and then reattempt the original command. ERROR: canceling statement due to user request SELECT citus.mitmproxy('conn.allow()'); mitmproxy @@ -111,8 +114,9 @@ SELECT citus.mitmproxy('conn.onQuery(query="CREATE").cancel(' || pg_backend_pid( (1 row) CREATE INDEX CONCURRENTLY idx_index_test ON index_test(id, value_1); -WARNING: CONCURRENTLY-enabled index commands can fail partially, leaving behind an INVALID index. -Use DROP INDEX CONCURRENTLY IF EXISTS to remove the invalid index. +WARNING: Commands that are not transaction-safe may result in partial failure, potentially leading to an inconsistent state. +If the problematic command is a CREATE operation, consider using the 'IF EXISTS' syntax to drop the object, +if applicable, and then reattempt the original command. ERROR: canceling statement due to user request SELECT citus.mitmproxy('conn.allow()'); mitmproxy @@ -137,8 +141,9 @@ SELECT citus.mitmproxy('conn.onQuery(query="DROP INDEX CONCURRENTLY").kill()'); (1 row) DROP INDEX CONCURRENTLY IF EXISTS idx_index_test; -WARNING: CONCURRENTLY-enabled index commands can fail partially, leaving behind an INVALID index. -Use DROP INDEX CONCURRENTLY IF EXISTS to remove the invalid index. +WARNING: Commands that are not transaction-safe may result in partial failure, potentially leading to an inconsistent state. +If the problematic command is a CREATE operation, consider using the 'IF EXISTS' syntax to drop the object, +if applicable, and then reattempt the original command. ERROR: connection to the remote node localhost:xxxxx failed with the following error: connection not open SELECT citus.mitmproxy('conn.allow()'); mitmproxy @@ -164,8 +169,9 @@ SELECT create_distributed_table('index_test_2', 'a'); INSERT INTO index_test_2 VALUES (1, 1), (1, 2); CREATE UNIQUE INDEX CONCURRENTLY index_test_2_a_idx ON index_test_2(a); -WARNING: CONCURRENTLY-enabled index commands can fail partially, leaving behind an INVALID index. -Use DROP INDEX CONCURRENTLY IF EXISTS to remove the invalid index. +WARNING: Commands that are not transaction-safe may result in partial failure, potentially leading to an inconsistent state. +If the problematic command is a CREATE operation, consider using the 'IF EXISTS' syntax to drop the object, +if applicable, and then reattempt the original command. ERROR: could not create unique index "index_test_2_a_idx_1880019" DETAIL: Key (a)=(1) is duplicated. CONTEXT: while executing command on localhost:xxxxx diff --git a/src/test/regress/expected/pg15.out b/src/test/regress/expected/pg15.out index caee521a7..fcbb0cd12 100644 --- a/src/test/regress/expected/pg15.out +++ b/src/test/regress/expected/pg15.out @@ -1536,121 +1536,3 @@ SET client_min_messages TO ERROR; DROP SCHEMA pg15 CASCADE; DROP ROLE rls_tenant_1; DROP ROLE rls_tenant_2; --- create/drop database for pg > 15 -\set create_drop_db_tablespace :abs_srcdir '/tmp_check/ts3' -CREATE TABLESPACE create_drop_db_tablespace LOCATION :'create_drop_db_tablespace'; -\c - - - :worker_1_port -\set create_drop_db_tablespace :abs_srcdir '/tmp_check/ts4' -CREATE TABLESPACE create_drop_db_tablespace LOCATION :'create_drop_db_tablespace'; -\c - - - :worker_2_port -\set create_drop_db_tablespace :abs_srcdir '/tmp_check/ts5' -CREATE TABLESPACE create_drop_db_tablespace LOCATION :'create_drop_db_tablespace'; -\c - - - :master_port -create user create_drop_db_test_user; -set citus.enable_create_database_propagation=on; -CREATE DATABASE mydatabase - WITH TEMPLATE = 'template0' - OWNER = create_drop_db_test_user - CONNECTION LIMIT = 10 - ENCODING = 'UTF8' - STRATEGY = 'wal_log' - LOCALE = '' - LC_COLLATE = 'POSIX' - LC_CTYPE = 'POSIX' - ICU_LOCALE = 'und' - LOCALE_PROVIDER = 'icu' - COLLATION_VERSION = '1.0' - TABLESPACE = create_drop_db_tablespace - ALLOW_CONNECTIONS = true - IS_TEMPLATE = false - OID = 966345; -SELECT pd.datname, pd.encoding, pd.datlocprovider, -pd.datistemplate, pd.datallowconn, pd.datconnlimit, -pd.datcollate , pd. datctype , pd.daticulocale, pd.datcollversion, -pd.datacl, rolname AS database_owner, pa.rolname AS database_owner, pt.spcname AS tablespace -FROM pg_database pd -JOIN pg_authid pa ON pd.datdba = pa.oid -join pg_tablespace pt on pd.dattablespace = pt.oid -WHERE datname = 'mydatabase'; - datname | encoding | datlocprovider | datistemplate | datallowconn | datconnlimit | datcollate | datctype | daticulocale | datcollversion | datacl | database_owner | database_owner | tablespace ---------------------------------------------------------------------- - mydatabase | 6 | i | f | t | 10 | C | C | und | 1.0 | | create_drop_db_test_user | create_drop_db_test_user | create_drop_db_tablespace -(1 row) - -\c - - - :worker_1_port -SELECT pd.datname, pd.encoding, pd.datlocprovider, -pd.datistemplate, pd.datallowconn, pd.datconnlimit, -pd.datcollate , pd. datctype , pd.daticulocale, pd.datcollversion, -pd.datacl, rolname AS database_owner, pa.rolname AS database_owner, pt.spcname AS tablespace -FROM pg_database pd -JOIN pg_authid pa ON pd.datdba = pa.oid -join pg_tablespace pt on pd.dattablespace = pt.oid -WHERE datname = 'mydatabase'; - datname | encoding | datlocprovider | datistemplate | datallowconn | datconnlimit | datcollate | datctype | daticulocale | datcollversion | datacl | database_owner | database_owner | tablespace ---------------------------------------------------------------------- - mydatabase | 6 | i | f | t | 10 | C | C | und | 1.0 | | create_drop_db_test_user | create_drop_db_test_user | create_drop_db_tablespace -(1 row) - -\c - - - :worker_2_port -SELECT pd.datname, pd.encoding, pd.datlocprovider, -pd.datistemplate, pd.datallowconn, pd.datconnlimit, -pd.datcollate , pd. datctype , pd.daticulocale, pd.datcollversion, -pd.datacl, rolname AS database_owner, pa.rolname AS database_owner, pt.spcname AS tablespace -FROM pg_database pd -JOIN pg_authid pa ON pd.datdba = pa.oid -join pg_tablespace pt on pd.dattablespace = pt.oid -WHERE datname = 'mydatabase'; - datname | encoding | datlocprovider | datistemplate | datallowconn | datconnlimit | datcollate | datctype | daticulocale | datcollversion | datacl | database_owner | database_owner | tablespace ---------------------------------------------------------------------- - mydatabase | 6 | i | f | t | 10 | C | C | und | 1.0 | | create_drop_db_test_user | create_drop_db_test_user | create_drop_db_tablespace -(1 row) - -\c - - - :master_port -set citus.enable_create_database_propagation=on; -drop database mydatabase; -SELECT pd.datname, pd.encoding, pd.datlocprovider, -pd.datistemplate, pd.datallowconn, pd.datconnlimit, -pd.datcollate , pd. datctype , pd.daticulocale, pd.datcollversion, -pd.datacl, rolname AS database_owner, pa.rolname AS database_owner, pt.spcname AS tablespace -FROM pg_database pd -JOIN pg_authid pa ON pd.datdba = pa.oid -join pg_tablespace pt on pd.dattablespace = pt.oid -WHERE datname = 'mydatabase'; - datname | encoding | datlocprovider | datistemplate | datallowconn | datconnlimit | datcollate | datctype | daticulocale | datcollversion | datacl | database_owner | database_owner | tablespace ---------------------------------------------------------------------- -(0 rows) - -\c - - - :worker_1_port -SELECT pd.datname, pd.encoding, pd.datlocprovider, -pd.datistemplate, pd.datallowconn, pd.datconnlimit, -pd.datcollate , pd. datctype , pd.daticulocale, pd.datcollversion, -pd.datacl, rolname AS database_owner, pa.rolname AS database_owner, pt.spcname AS tablespace -FROM pg_database pd -JOIN pg_authid pa ON pd.datdba = pa.oid -join pg_tablespace pt on pd.dattablespace = pt.oid -WHERE datname = 'mydatabase'; - datname | encoding | datlocprovider | datistemplate | datallowconn | datconnlimit | datcollate | datctype | daticulocale | datcollversion | datacl | database_owner | database_owner | tablespace ---------------------------------------------------------------------- -(0 rows) - -\c - - - :worker_2_port -SELECT pd.datname, pd.encoding, pd.datlocprovider, -pd.datistemplate, pd.datallowconn, pd.datconnlimit, -pd.datcollate , pd. datctype , pd.daticulocale, pd.datcollversion, -pd.datacl, rolname AS database_owner, pa.rolname AS database_owner, pt.spcname AS tablespace -FROM pg_database pd -JOIN pg_authid pa ON pd.datdba = pa.oid -join pg_tablespace pt on pd.dattablespace = pt.oid -WHERE datname = 'mydatabase'; - datname | encoding | datlocprovider | datistemplate | datallowconn | datconnlimit | datcollate | datctype | daticulocale | datcollversion | datacl | database_owner | database_owner | tablespace ---------------------------------------------------------------------- -(0 rows) - -\c - - - :master_port -drop tablespace create_drop_db_tablespace; -\c - - - :worker_1_port -drop tablespace create_drop_db_tablespace; -\c - - - :worker_2_port -drop tablespace create_drop_db_tablespace; -\c - - - :master_port -drop user create_drop_db_test_user; \ No newline at end of file diff --git a/src/test/regress/expected/single_node.out b/src/test/regress/expected/single_node.out index f485763c5..3b24fd5f5 100644 --- a/src/test/regress/expected/single_node.out +++ b/src/test/regress/expected/single_node.out @@ -88,8 +88,9 @@ SELECT create_distributed_table('failover_to_local', 'a', shard_count=>32); (1 row) CREATE INDEX CONCURRENTLY ON failover_to_local(a); -WARNING: CONCURRENTLY-enabled index commands can fail partially, leaving behind an INVALID index. - Use DROP INDEX CONCURRENTLY IF EXISTS to remove the invalid index. +WARNING: Commands that are not transaction-safe may result in partial failure, potentially leading to an inconsistent state. +If the problematic command is a CREATE operation, consider using the 'IF EXISTS' syntax to drop the object, +if applicable, and then reattempt the original command. ERROR: the total number of connections on the server is more than max_connections(100) HINT: Consider using a higher value for max_connections -- reset global GUC changes diff --git a/src/test/regress/expected/single_node_0.out b/src/test/regress/expected/single_node_0.out index 321d283f8..a44460cca 100644 --- a/src/test/regress/expected/single_node_0.out +++ b/src/test/regress/expected/single_node_0.out @@ -88,8 +88,9 @@ SELECT create_distributed_table('failover_to_local', 'a', shard_count=>32); (1 row) CREATE INDEX CONCURRENTLY ON failover_to_local(a); -WARNING: CONCURRENTLY-enabled index commands can fail partially, leaving behind an INVALID index. - Use DROP INDEX CONCURRENTLY IF EXISTS to remove the invalid index. +WARNING: Commands that are not transaction-safe may result in partial failure, potentially leading to an inconsistent state. +If the problematic command is a CREATE operation, consider using the 'IF EXISTS' syntax to drop the object, +if applicable, and then reattempt the original command. ERROR: the total number of connections on the server is more than max_connections(100) HINT: Consider using a higher value for max_connections -- reset global GUC changes diff --git a/src/test/regress/multi_1_schedule b/src/test/regress/multi_1_schedule index 309a0cde4..f53cb3a34 100644 --- a/src/test/regress/multi_1_schedule +++ b/src/test/regress/multi_1_schedule @@ -33,6 +33,9 @@ test: ref_citus_local_fkeys test: alter_database_owner test: distributed_triggers test: create_single_shard_table +test: create_drop_database_propagation +test: create_drop_database_propagation_pg15 + # don't parallelize single_shard_table_udfs to make sure colocation ids are sequential test: single_shard_table_udfs test: schema_based_sharding @@ -52,7 +55,6 @@ test: multi_read_from_secondaries test: grant_on_database_propagation test: alter_database_propagation -test: create_drop_database_propagation test: citus_shards diff --git a/src/test/regress/sql/columnar_create.sql b/src/test/regress/sql/columnar_create.sql index 408ce126e..a0708aeac 100644 --- a/src/test/regress/sql/columnar_create.sql +++ b/src/test/regress/sql/columnar_create.sql @@ -136,22 +136,34 @@ CREATE TEMPORARY TABLE columnar_temp(i int) USING columnar; -- reserve some chunks and a stripe INSERT INTO columnar_temp SELECT i FROM generate_series(1,5) i; -SELECT columnar.get_storage_id(oid) AS columnar_temp_storage_id -FROM pg_class WHERE relname='columnar_temp' \gset - -SELECT pg_backend_pid() AS val INTO old_backend_pid; +SELECT columnar.get_storage_id(oid) as oid INTO columnar_temp_storage_id +FROM pg_class WHERE relname='columnar_temp'; \c - - - :master_port SET search_path TO columnar_create; --- wait until old backend to expire to make sure that temp table cleanup is complete -SELECT columnar_test_helpers.pg_waitpid(val) FROM old_backend_pid; +-- wait until temporary table and its metadata is removed +DO $$ +DECLARE + loop_wait_count integer := 0; +BEGIN + WHILE ( + (SELECT COUNT(*) > 0 FROM pg_class WHERE relname='columnar_temp') OR + (SELECT columnar_test_helpers.columnar_metadata_has_storage_id(oid) FROM columnar_temp_storage_id) + ) + LOOP + IF loop_wait_count > 1000 THEN + RAISE EXCEPTION 'Timeout while waiting for temporary table to be dropped'; + END IF; -DROP TABLE old_backend_pid; + PERFORM pg_sleep(0.001); --- show that temporary table itself and its metadata is removed -SELECT COUNT(*)=0 FROM pg_class WHERE relname='columnar_temp'; -SELECT columnar_test_helpers.columnar_metadata_has_storage_id(:columnar_temp_storage_id); + loop_wait_count := loop_wait_count + 1; + END LOOP; +END; +$$ language plpgsql; + +DROP TABLE columnar_temp_storage_id; -- connect to another session and create a temp table with same name CREATE TEMPORARY TABLE columnar_temp(i int) USING columnar; diff --git a/src/test/regress/sql/create_drop_database_propagation.sql b/src/test/regress/sql/create_drop_database_propagation.sql index d84654054..ae90088d1 100644 --- a/src/test/regress/sql/create_drop_database_propagation.sql +++ b/src/test/regress/sql/create_drop_database_propagation.sql @@ -1,5 +1,7 @@ - +-- test for create/drop database propagation +-- This test is only executes for Postgres 14 +-- For postgres 15 tests, pg15_create_drop_database_propagation.sql is used \set create_drop_db_tablespace :abs_srcdir '/tmp_check/ts3' CREATE TABLESPACE create_drop_db_tablespace LOCATION :'create_drop_db_tablespace'; @@ -16,83 +18,169 @@ create user create_drop_db_test_user; set citus.enable_create_database_propagation=on; + CREATE DATABASE mydatabase - WITH TEMPLATE = 'template0' - OWNER = create_drop_db_test_user - CONNECTION LIMIT = 10 + WITH OWNER = create_drop_db_test_user ENCODING = 'UTF8' - LC_COLLATE = 'C' - LC_CTYPE = 'C' + CONNECTION LIMIT = 10 TABLESPACE = create_drop_db_tablespace ALLOW_CONNECTIONS = true IS_TEMPLATE = false; -SELECT pd.datname, pd.encoding, -pd.datistemplate, pd.datallowconn, pd.datconnlimit, -pd.datcollate , pd. datctype , pd.datacl, rolname AS database_owner, -pa.rolname AS database_owner, pt.spcname AS tablespace -FROM pg_database pd -JOIN pg_authid pa ON pd.datdba = pa.oid -join pg_tablespace pt on pd.dattablespace = pt.oid -WHERE datname = 'mydatabase'; +SELECT result from run_command_on_all_nodes( + $$ + SELECT jsonb_agg(to_jsonb(q2.*)) FROM ( + SELECT pd.datname, pg_encoding_to_char(pd.encoding) as encoding, + pd.datistemplate, pd.datallowconn, pd.datconnlimit, + pd.datcollate , pd. datctype , pd.datacl, + pa.rolname AS database_owner, pt.spcname AS tablespace + FROM pg_database pd + JOIN pg_authid pa ON pd.datdba = pa.oid + join pg_tablespace pt on pd.dattablespace = pt.oid + WHERE datname = 'mydatabase' + ) q2 + $$ +) ORDER BY result; -\c - - - :worker_1_port -SELECT pd.datname, pd.encoding, -pd.datistemplate, pd.datallowconn, pd.datconnlimit, -pd.datcollate , pd. datctype , pd.datacl, rolname AS database_owner, -pa.rolname AS database_owner, pt.spcname AS tablespace -FROM pg_database pd -JOIN pg_authid pa ON pd.datdba = pa.oid -join pg_tablespace pt on pd.dattablespace = pt.oid -WHERE datname = 'mydatabase'; -\c - - - :worker_2_port - -SELECT pd.datname, pd.encoding, -pd.datistemplate, pd.datallowconn, pd.datconnlimit, -pd.datcollate , pd. datctype , pd.datacl, rolname AS database_owner, -pa.rolname AS database_owner, pt.spcname AS tablespace -FROM pg_database pd -JOIN pg_authid pa ON pd.datdba = pa.oid -join pg_tablespace pt on pd.dattablespace = pt.oid -WHERE datname = 'mydatabase'; - -\c - - - :master_port -set citus.enable_create_database_propagation=on; drop database mydatabase; -SELECT pd.datname, pd.encoding, -pd.datistemplate, pd.datallowconn, pd.datconnlimit, -pd.datcollate , pd. datctype , pd.datacl, rolname AS database_owner, -pa.rolname AS database_owner, pt.spcname AS tablespace -FROM pg_database pd -JOIN pg_authid pa ON pd.datdba = pa.oid -join pg_tablespace pt on pd.dattablespace = pt.oid -WHERE datname = 'mydatabase'; -\c - - - :worker_1_port -SELECT pd.datname, pd.encoding, -pd.datistemplate, pd.datallowconn, pd.datconnlimit, -pd.datcollate , pd. datctype , pd.datacl, rolname AS database_owner, -pa.rolname AS database_owner, pt.spcname AS tablespace -FROM pg_database pd -JOIN pg_authid pa ON pd.datdba = pa.oid -join pg_tablespace pt on pd.dattablespace = pt.oid -WHERE datname = 'mydatabase'; +SELECT result from run_command_on_all_nodes( + $$ + SELECT jsonb_agg(to_jsonb(q2.*)) FROM ( + SELECT pd.datname, pg_encoding_to_char(pd.encoding) as encoding, + pd.datistemplate, pd.datallowconn, pd.datconnlimit, + pd.datcollate , pd. datctype , pd.datacl, + pa.rolname AS database_owner, pt.spcname AS tablespace + FROM pg_database pd + JOIN pg_authid pa ON pd.datdba = pa.oid + join pg_tablespace pt on pd.dattablespace = pt.oid + WHERE datname = 'mydatabase' + ) q2 + $$ +) ORDER BY result; -\c - - - :worker_2_port +-- test database syncing after node addition -SELECT pd.datname, pd.encoding, -pd.datistemplate, pd.datallowconn, pd.datconnlimit, -pd.datcollate , pd. datctype , pd.datacl, rolname AS database_owner, -pa.rolname AS database_owner, pt.spcname AS tablespace -FROM pg_database pd -JOIN pg_authid pa ON pd.datdba = pa.oid -join pg_tablespace pt on pd.dattablespace = pt.oid -WHERE datname = 'mydatabase'; +select 1 from citus_remove_node('localhost', :worker_2_port); -\c - - - :master_port +--test with is_template true and allow connections false +CREATE DATABASE mydatabase + OWNER = create_drop_db_test_user + CONNECTION LIMIT = 10 + ENCODING = 'UTF8' + TABLESPACE = create_drop_db_tablespace + ALLOW_CONNECTIONS = false + IS_TEMPLATE = false; + + +SELECT result from run_command_on_all_nodes( + $$ + SELECT jsonb_agg(to_jsonb(q2.*)) FROM ( + SELECT pd.datname, pg_encoding_to_char(pd.encoding) as encoding, + pd.datistemplate, pd.datallowconn, pd.datconnlimit, + pd.datcollate , pd. datctype , pd.datacl, + pa.rolname AS database_owner, pt.spcname AS tablespace + FROM pg_database pd + JOIN pg_authid pa ON pd.datdba = pa.oid + join pg_tablespace pt on pd.dattablespace = pt.oid + WHERE datname = 'mydatabase' + ) q2 + $$ +) ORDER BY result; + +select 1 from citus_add_node('localhost', :worker_2_port); + +SELECT result from run_command_on_all_nodes( + $$ + SELECT jsonb_agg(to_jsonb(q2.*)) FROM ( + SELECT pd.datname, pg_encoding_to_char(pd.encoding) as encoding, + pd.datistemplate, pd.datallowconn, pd.datconnlimit, + pd.datcollate , pd. datctype , pd.datacl, + pa.rolname AS database_owner, pt.spcname AS tablespace + FROM pg_database pd + JOIN pg_authid pa ON pd.datdba = pa.oid + join pg_tablespace pt on pd.dattablespace = pt.oid + WHERE datname = 'mydatabase' + ) q2 + $$ +) ORDER BY result; + +SET citus.log_remote_commands = true; +set citus.grep_remote_commands = '%DROP DATABASE%'; +drop database mydatabase; + +SET citus.log_remote_commands = false; + +SELECT result from run_command_on_all_nodes( + $$ + SELECT jsonb_agg(to_jsonb(q2.*)) FROM ( + SELECT pd.datname, pg_encoding_to_char(pd.encoding) as encoding, + pd.datistemplate, pd.datallowconn, pd.datconnlimit, + pd.datcollate , pd. datctype , pd.datacl, + pa.rolname AS database_owner, pt.spcname AS tablespace + FROM pg_database pd + JOIN pg_authid pa ON pd.datdba = pa.oid + join pg_tablespace pt on pd.dattablespace = pt.oid + WHERE datname = 'mydatabase' + ) q2 + $$ +) ORDER BY result; + +-- create a template database with all options set and allow connections false +CREATE DATABASE my_template_database + WITH OWNER = create_drop_db_test_user + ENCODING = 'UTF8' + TABLESPACE = create_drop_db_tablespace + ALLOW_CONNECTIONS = false + IS_TEMPLATE = true; + +SET citus.log_remote_commands = false; + +SELECT result from run_command_on_all_nodes( + $$ + SELECT jsonb_agg(to_jsonb(q2.*)) FROM ( + SELECT pd.datname, pg_encoding_to_char(pd.encoding) as encoding, + pd.datistemplate, pd.datallowconn, pd.datconnlimit, + pd.datcollate , pd. datctype , pd.datacl, + pa.rolname AS database_owner, pt.spcname AS tablespace + FROM pg_database pd + JOIN pg_authid pa ON pd.datdba = pa.oid + join pg_tablespace pt on pd.dattablespace = pt.oid + WHERE datname = 'my_template_database' + ) q2 + $$ +) ORDER BY result; + +--template databases could not be dropped so we need to change the template flag +SELECT result from run_command_on_all_nodes( + $$ + UPDATE pg_database SET datistemplate = false WHERE datname = 'my_template_database' + $$ +) ORDER BY result; + +SET citus.log_remote_commands = true; + +set citus.grep_remote_commands = '%DROP DATABASE%'; +drop database my_template_database; + +SET citus.log_remote_commands = false; +SELECT result from run_command_on_all_nodes( + $$ + SELECT jsonb_agg(to_jsonb(q2.*)) FROM ( + SELECT pd.datname, pg_encoding_to_char(pd.encoding) as encoding, + pd.datistemplate, pd.datallowconn, pd.datconnlimit, + pd.datcollate , pd. datctype , pd.datacl, + pa.rolname AS database_owner, pt.spcname AS tablespace + FROM pg_database pd + JOIN pg_authid pa ON pd.datdba = pa.oid + join pg_tablespace pt on pd.dattablespace = pt.oid + WHERE datname = 'my_template_database' + ) q2 + $$ +) ORDER BY result; --tests for special characters in database name set citus.enable_create_database_propagation=on; @@ -104,6 +192,37 @@ create database "mydatabase#1'2"; set citus.grep_remote_commands = '%DROP DATABASE%'; drop database if exists "mydatabase#1'2"; +--test for unsupported options + +CREATE DATABASE mydatabase + with CONNECTION LIMIT = 10 + ENCODING = 'UTF8' + LC_CTYPE = 'C.UTF-8' + ALLOW_CONNECTIONS = false + IS_TEMPLATE = false; + +CREATE DATABASE mydatabase + with CONNECTION LIMIT = 10 + ENCODING = 'UTF8' + LC_CTYPE = 'C.UTF-8' + ALLOW_CONNECTIONS = false + IS_TEMPLATE = false; + +CREATE DATABASE mydatabase + with CONNECTION LIMIT = 10 + ENCODING = 'UTF8' + LC_COLLATE = 'C.UTF-8' + ALLOW_CONNECTIONS = false + IS_TEMPLATE = false; + +CREATE DATABASE mydatabase + with CONNECTION LIMIT = 10 + ENCODING = 'UTF8' + LOCALE = 'C.UTF-8' + ALLOW_CONNECTIONS = false + IS_TEMPLATE = false; + + --clean up resources created by this test drop tablespace create_drop_db_tablespace; diff --git a/src/test/regress/sql/create_drop_database_propagation_pg15.sql b/src/test/regress/sql/create_drop_database_propagation_pg15.sql new file mode 100644 index 000000000..ca3e3b202 --- /dev/null +++ b/src/test/regress/sql/create_drop_database_propagation_pg15.sql @@ -0,0 +1,244 @@ +-- +-- PG15 +-- +SHOW server_version \gset +SELECT substring(:'server_version', '\d+')::int >= 15 AS server_version_ge_15 +\gset +\if :server_version_ge_15 +\else +\q +\endif + +-- create/drop database for pg > 15 + + +\set create_drop_db_tablespace :abs_srcdir '/tmp_check/ts3' +CREATE TABLESPACE create_drop_db_tablespace LOCATION :'create_drop_db_tablespace'; + +\c - - - :worker_1_port +\set create_drop_db_tablespace :abs_srcdir '/tmp_check/ts4' +CREATE TABLESPACE create_drop_db_tablespace LOCATION :'create_drop_db_tablespace'; + +\c - - - :worker_2_port +\set create_drop_db_tablespace :abs_srcdir '/tmp_check/ts5' +CREATE TABLESPACE create_drop_db_tablespace LOCATION :'create_drop_db_tablespace'; + +\c - - - :master_port +create user create_drop_db_test_user; +set citus.enable_create_database_propagation=on; +SET citus.log_remote_commands = true; +set citus.grep_remote_commands = '%CREATE DATABASE%'; +CREATE DATABASE mydatabase + WITH + OWNER = create_drop_db_test_user + CONNECTION LIMIT = 10 + ENCODING = 'UTF8' + TABLESPACE = create_drop_db_tablespace + ALLOW_CONNECTIONS = true + IS_TEMPLATE = false + OID = 966345; + +SET citus.log_remote_commands = false; + +SELECT result from run_command_on_all_nodes( + $$ + SELECT jsonb_agg(to_jsonb(q2.*)) FROM ( + SELECT pd.datname, pg_encoding_to_char(pd.encoding) as encoding, + pd.datistemplate, pd.datallowconn, pd.datconnlimit, + pd.datcollate , pd. datctype , pd.datacl, + pa.rolname AS database_owner, pt.spcname AS tablespace + FROM pg_database pd + JOIN pg_authid pa ON pd.datdba = pa.oid + join pg_tablespace pt on pd.dattablespace = pt.oid + WHERE datname = 'mydatabase' + ) q2 + $$ +) ORDER BY result; + +SET citus.log_remote_commands = true; +set citus.grep_remote_commands = '%DROP DATABASE%'; +drop database mydatabase; + +SET citus.log_remote_commands = false; +SELECT result from run_command_on_all_nodes( + $$ + SELECT jsonb_agg(to_jsonb(q2.*)) FROM ( + SELECT pd.datname, pg_encoding_to_char(pd.encoding) as encoding, + pd.datistemplate, pd.datallowconn, pd.datconnlimit, + pd.datcollate , pd. datctype , pd.datacl, + pa.rolname AS database_owner, pt.spcname AS tablespace + FROM pg_database pd + JOIN pg_authid pa ON pd.datdba = pa.oid + join pg_tablespace pt on pd.dattablespace = pt.oid + WHERE datname = 'mydatabase' + ) q2 + $$ +) ORDER BY result; + +select citus_remove_node('localhost', :worker_2_port); + + +SET citus.log_remote_commands = true; +set citus.grep_remote_commands = '%CREATE DATABASE%'; + +CREATE DATABASE mydatabase2 + WITH OWNER = create_drop_db_test_user + ENCODING = 'UTF8' + TABLESPACE = create_drop_db_tablespace + ALLOW_CONNECTIONS = true + IS_TEMPLATE = false + OID = 966345; + +SET citus.log_remote_commands = false; +SELECT result from run_command_on_all_nodes( + $$ + SELECT jsonb_agg(to_jsonb(q2.*)) FROM ( + SELECT pd.datname, pg_encoding_to_char(pd.encoding) as encoding, + pd.datistemplate, pd.datallowconn, pd.datconnlimit, + pd.datcollate , pd. datctype , pd.datacl, + pa.rolname AS database_owner, pt.spcname AS tablespace + FROM pg_database pd + JOIN pg_authid pa ON pd.datdba = pa.oid + join pg_tablespace pt on pd.dattablespace = pt.oid + WHERE datname = 'mydatabase2' + ) q2 + $$ +) ORDER BY result; + + +select 1 from citus_add_node('localhost', :worker_2_port); + +SELECT result from run_command_on_all_nodes( + $$ + SELECT jsonb_agg(to_jsonb(q2.*)) FROM ( + SELECT pd.datname, pg_encoding_to_char(pd.encoding) as encoding, + pd.datistemplate, pd.datallowconn, pd.datconnlimit, + pd.datcollate , pd. datctype , pd.datacl, + pa.rolname AS database_owner, pt.spcname AS tablespace + FROM pg_database pd + JOIN pg_authid pa ON pd.datdba = pa.oid + join pg_tablespace pt on pd.dattablespace = pt.oid + WHERE datname = 'mydatabase2' + ) q2 + $$ +) ORDER BY result; + +SET citus.log_remote_commands = true; +set citus.grep_remote_commands = '%DROP DATABASE%'; +drop database mydatabase2; + +SET citus.log_remote_commands = false; + +SELECT result from run_command_on_all_nodes( + $$ + SELECT jsonb_agg(to_jsonb(q2.*)) FROM ( + SELECT pd.datname, pg_encoding_to_char(pd.encoding) as encoding, + pd.datistemplate, pd.datallowconn, pd.datconnlimit, + pd.datcollate , pd. datctype , pd.datacl, + pa.rolname AS database_owner, pt.spcname AS tablespace + FROM pg_database pd + JOIN pg_authid pa ON pd.datdba = pa.oid + join pg_tablespace pt on pd.dattablespace = pt.oid + WHERE datname = 'mydatabase' + ) q2 + $$ +) ORDER BY result; + +SET citus.log_remote_commands = true; +set citus.grep_remote_commands = '%CREATE DATABASE%'; + +-- create a template database with all options set and allow connections false +CREATE DATABASE my_template_database + WITH OWNER = create_drop_db_test_user + ENCODING = 'UTF8' + COLLATION_VERSION = '1.0' + TABLESPACE = create_drop_db_tablespace + ALLOW_CONNECTIONS = false + IS_TEMPLATE = true; + +SET citus.log_remote_commands = false; + +SELECT result from run_command_on_all_nodes( + $$ + SELECT jsonb_agg(to_jsonb(q2.*)) FROM ( + SELECT pd.datname, pg_encoding_to_char(pd.encoding) as encoding, + pd.datistemplate, pd.datallowconn, pd.datconnlimit, + pd.datcollate , pd. datctype , pd.datacl, + pa.rolname AS database_owner, pt.spcname AS tablespace + FROM pg_database pd + JOIN pg_authid pa ON pd.datdba = pa.oid + join pg_tablespace pt on pd.dattablespace = pt.oid + WHERE datname = 'my_template_database' + ) q2 + $$ +) ORDER BY result; + + +SET citus.log_remote_commands = false; + +SELECT result from run_command_on_all_nodes( + $$ + SELECT jsonb_agg(to_jsonb(q2.*)) FROM ( + SELECT pd.datname, pg_encoding_to_char(pd.encoding) as encoding, + pd.datistemplate, pd.datallowconn, pd.datconnlimit, + pd.datcollate , pd. datctype , pd.datacl, + pa.rolname AS database_owner, pt.spcname AS tablespace + FROM pg_database pd + JOIN pg_authid pa ON pd.datdba = pa.oid + join pg_tablespace pt on pd.dattablespace = pt.oid + WHERE datname = 'my_template_database' + ) q2 + $$ +) ORDER BY result; + +SET citus.log_remote_commands = true; + +--template databases could not be dropped so we need to change the template flag +SELECT result from run_command_on_all_nodes( + $$ + UPDATE pg_database SET datistemplate = false WHERE datname = 'my_template_database' + $$ +) ORDER BY result; + + +set citus.grep_remote_commands = '%DROP DATABASE%'; +drop database my_template_database; + +SET citus.log_remote_commands = false; +SELECT result from run_command_on_all_nodes( + $$ + SELECT jsonb_agg(to_jsonb(q2.*)) FROM ( + SELECT pd.datname, pg_encoding_to_char(pd.encoding) as encoding, + pd.datistemplate, pd.datallowconn, pd.datconnlimit, + pd.datcollate , pd. datctype , pd.datacl, + pa.rolname AS database_owner, pt.spcname AS tablespace + FROM pg_database pd + JOIN pg_authid pa ON pd.datdba = pa.oid + join pg_tablespace pt on pd.dattablespace = pt.oid + WHERE datname = 'my_template_database' + ) q2 + $$ +) ORDER BY result; + + +--tests for special characters in database name +set citus.enable_create_database_propagation=on; +SET citus.log_remote_commands = true; +set citus.grep_remote_commands = '%CREATE DATABASE%'; + +create database "mydatabase#1'2"; + +set citus.grep_remote_commands = '%DROP DATABASE%'; +drop database if exists "mydatabase#1'2"; + +\c - - - :master_port +drop tablespace create_drop_db_tablespace; + +\c - - - :worker_1_port +drop tablespace create_drop_db_tablespace; + +\c - - - :worker_2_port +drop tablespace create_drop_db_tablespace; + +\c - - - :master_port +drop user create_drop_db_test_user; diff --git a/src/test/regress/sql/pg15.sql b/src/test/regress/sql/pg15.sql index f0c6706d2..fe60222dd 100644 --- a/src/test/regress/sql/pg15.sql +++ b/src/test/regress/sql/pg15.sql @@ -976,115 +976,3 @@ SET client_min_messages TO ERROR; DROP SCHEMA pg15 CASCADE; DROP ROLE rls_tenant_1; DROP ROLE rls_tenant_2; - - --- create/drop database for pg > 15 - - -\set create_drop_db_tablespace :abs_srcdir '/tmp_check/ts3' -CREATE TABLESPACE create_drop_db_tablespace LOCATION :'create_drop_db_tablespace'; - -\c - - - :worker_1_port -\set create_drop_db_tablespace :abs_srcdir '/tmp_check/ts4' -CREATE TABLESPACE create_drop_db_tablespace LOCATION :'create_drop_db_tablespace'; - -\c - - - :worker_2_port -\set create_drop_db_tablespace :abs_srcdir '/tmp_check/ts5' -CREATE TABLESPACE create_drop_db_tablespace LOCATION :'create_drop_db_tablespace'; - -\c - - - :master_port -create user create_drop_db_test_user; -set citus.enable_create_database_propagation=on; -CREATE DATABASE mydatabase - WITH TEMPLATE = 'template0' - OWNER = create_drop_db_test_user - CONNECTION LIMIT = 10 - ENCODING = 'UTF8' - STRATEGY = 'wal_log' - LOCALE = '' - LC_COLLATE = 'POSIX' - LC_CTYPE = 'POSIX' - ICU_LOCALE = 'und' - LOCALE_PROVIDER = 'icu' - COLLATION_VERSION = '1.0' - TABLESPACE = create_drop_db_tablespace - ALLOW_CONNECTIONS = true - IS_TEMPLATE = false - OID = 966345; - -SELECT pd.datname, pd.encoding, pd.datlocprovider, -pd.datistemplate, pd.datallowconn, pd.datconnlimit, -pd.datcollate , pd. datctype , pd.daticulocale, pd.datcollversion, -pd.datacl, rolname AS database_owner, pa.rolname AS database_owner, pt.spcname AS tablespace -FROM pg_database pd -JOIN pg_authid pa ON pd.datdba = pa.oid -join pg_tablespace pt on pd.dattablespace = pt.oid -WHERE datname = 'mydatabase'; - -\c - - - :worker_1_port -SELECT pd.datname, pd.encoding, pd.datlocprovider, -pd.datistemplate, pd.datallowconn, pd.datconnlimit, -pd.datcollate , pd. datctype , pd.daticulocale, pd.datcollversion, -pd.datacl, rolname AS database_owner, pa.rolname AS database_owner, pt.spcname AS tablespace -FROM pg_database pd -JOIN pg_authid pa ON pd.datdba = pa.oid -join pg_tablespace pt on pd.dattablespace = pt.oid -WHERE datname = 'mydatabase'; - - -\c - - - :worker_2_port -SELECT pd.datname, pd.encoding, pd.datlocprovider, -pd.datistemplate, pd.datallowconn, pd.datconnlimit, -pd.datcollate , pd. datctype , pd.daticulocale, pd.datcollversion, -pd.datacl, rolname AS database_owner, pa.rolname AS database_owner, pt.spcname AS tablespace -FROM pg_database pd -JOIN pg_authid pa ON pd.datdba = pa.oid -join pg_tablespace pt on pd.dattablespace = pt.oid -WHERE datname = 'mydatabase'; - - -\c - - - :master_port -set citus.enable_create_database_propagation=on; -drop database mydatabase; -SELECT pd.datname, pd.encoding, pd.datlocprovider, -pd.datistemplate, pd.datallowconn, pd.datconnlimit, -pd.datcollate , pd. datctype , pd.daticulocale, pd.datcollversion, -pd.datacl, rolname AS database_owner, pa.rolname AS database_owner, pt.spcname AS tablespace -FROM pg_database pd -JOIN pg_authid pa ON pd.datdba = pa.oid -join pg_tablespace pt on pd.dattablespace = pt.oid -WHERE datname = 'mydatabase'; - - -\c - - - :worker_1_port -SELECT pd.datname, pd.encoding, pd.datlocprovider, -pd.datistemplate, pd.datallowconn, pd.datconnlimit, -pd.datcollate , pd. datctype , pd.daticulocale, pd.datcollversion, -pd.datacl, rolname AS database_owner, pa.rolname AS database_owner, pt.spcname AS tablespace -FROM pg_database pd -JOIN pg_authid pa ON pd.datdba = pa.oid -join pg_tablespace pt on pd.dattablespace = pt.oid -WHERE datname = 'mydatabase'; - -\c - - - :worker_2_port - -SELECT pd.datname, pd.encoding, pd.datlocprovider, -pd.datistemplate, pd.datallowconn, pd.datconnlimit, -pd.datcollate , pd. datctype , pd.daticulocale, pd.datcollversion, -pd.datacl, rolname AS database_owner, pa.rolname AS database_owner, pt.spcname AS tablespace -FROM pg_database pd -JOIN pg_authid pa ON pd.datdba = pa.oid -join pg_tablespace pt on pd.dattablespace = pt.oid -WHERE datname = 'mydatabase'; - -\c - - - :master_port -drop tablespace create_drop_db_tablespace; - -\c - - - :worker_1_port -drop tablespace create_drop_db_tablespace; - -\c - - - :worker_2_port -drop tablespace create_drop_db_tablespace; - -\c - - - :master_port -drop user create_drop_db_test_user;