From 65bd540943cfa36ca886ef3025c7e4920a3e8ad9 Mon Sep 17 00:00:00 2001 From: Nils Dijk Date: Tue, 1 Mar 2022 15:29:31 +0100 Subject: [PATCH] Feature: configure object propagation behaviour in transactions (#5724) DESCRIPTION: Add GUC to control ddl creation behaviour in transactions Historically we would _not_ propagate objects when we are in a transaction block. Creation of distributed tables would not always work in sequential mode, hence objects created in the same transaction as distributing a table that would use the just created object wouldn't work. The benefit was that the user could still benefit from parallelism. Now that the creation of distributed tables is supported in sequential mode it would make sense for users to force transactional consistency of ddl commands for distributed tables. A transaction could switch more aggressively to sequential mode when creating new objects in a transaction. We don't change the default behaviour just yet. Also, many objects would not even propagate their creation when the transaction was already set to sequential, leaving the probability of a self deadlock. The new policy checks solve this discrepancy between objects as well. --- src/backend/distributed/commands/collation.c | 13 +-- .../distributed/commands/dependencies.c | 84 +++++++++++++++ src/backend/distributed/commands/extension.c | 7 +- .../distributed/commands/foreign_server.c | 29 ++++- src/backend/distributed/commands/schema.c | 4 +- .../distributed/commands/text_search.c | 12 +-- src/backend/distributed/commands/type.c | 2 +- .../distributed/commands/utility_hook.c | 1 + .../distributed/executor/multi_executor.c | 5 + src/backend/distributed/shared_library_init.c | 25 +++++ .../distributed/commands/utility_hook.h | 8 ++ src/include/distributed/metadata_utility.h | 1 + .../regress/expected/distributed_types.out | 101 ++++++++++++++++++ src/test/regress/sql/distributed_types.sql | 41 +++++++ 14 files changed, 308 insertions(+), 25 deletions(-) diff --git a/src/backend/distributed/commands/collation.c b/src/backend/distributed/commands/collation.c index 7f047ec1d..12bf1404a 100644 --- a/src/backend/distributed/commands/collation.c +++ b/src/backend/distributed/commands/collation.c @@ -530,11 +530,14 @@ PreprocessDefineCollationStmt(Node *node, const char *queryString, { Assert(castNode(DefineStmt, node)->kind == OBJECT_COLLATION); - if (ShouldPropagateDefineCollationStmt()) + if (!ShouldPropagateDefineCollationStmt()) { - EnsureCoordinator(); + return NIL; } + EnsureCoordinator(); + EnsureSequentialMode(OBJECT_COLLATION); + return NIL; } @@ -575,8 +578,7 @@ PostprocessDefineCollationStmt(Node *node, const char *queryString) * ShouldPropagateDefineCollationStmt checks if collation define * statement should be propagated. Don't propagate if: * - metadata syncing if off - * - statement is part of a multi stmt transaction and the multi shard connection - * type is not sequential + * - create statement should be propagated according the the ddl propagation policy */ static bool ShouldPropagateDefineCollationStmt() @@ -586,8 +588,7 @@ ShouldPropagateDefineCollationStmt() return false; } - if (IsMultiStatementTransaction() && - MultiShardConnectionType != SEQUENTIAL_CONNECTION) + if (!ShouldPropagateCreateInCoordinatedTransction()) { return false; } diff --git a/src/backend/distributed/commands/dependencies.c b/src/backend/distributed/commands/dependencies.c index c1bd4c340..fe6e651fa 100644 --- a/src/backend/distributed/commands/dependencies.c +++ b/src/backend/distributed/commands/dependencies.c @@ -20,6 +20,8 @@ #include "distributed/metadata/dependency.h" #include "distributed/metadata/distobject.h" #include "distributed/metadata_sync.h" +#include "distributed/multi_executor.h" +#include "distributed/relation_access_tracking.h" #include "distributed/remote_commands.h" #include "distributed/worker_manager.h" #include "distributed/worker_transaction.h" @@ -459,6 +461,88 @@ ShouldPropagate(void) } +/* + * ShouldPropagateCreateInCoordinatedTransction returns based the current state of the + * session and policies if Citus needs to propagate the creation of new objects. + * + * Creation of objects on other nodes could be postponed till the object is actually used + * in a sharded object (eg. distributed table or index on a distributed table). In certain + * use cases the opportunity for parallelism in a transaction block is preferred. When + * configured like that the creation of an object might be postponed and backfilled till + * the object is actually used. + */ +bool +ShouldPropagateCreateInCoordinatedTransction() +{ + if (!IsMultiStatementTransaction()) + { + /* + * If we are in a single statement transaction we will always propagate the + * creation of objects. There are no downsides in regard to performance or + * transactional limitations. These only arise with transaction blocks consisting + * of multiple statements. + */ + return true; + } + + if (MultiShardConnectionType == SEQUENTIAL_CONNECTION) + { + /* + * If we are in a transaction that is already switched to sequential, either by + * the user, or automatically by an other command, we will always propagate the + * creation of new objects to the workers. + * + * This guarantees no strange anomalies when the transaction aborts or on + * visibility of the newly created object. + */ + return true; + } + + switch (CreateObjectPropagationMode) + { + case CREATE_OBJECT_PROPAGATION_DEFERRED: + { + /* + * We prefer parallelism at this point. Since we did not already return while + * checking for sequential mode we are still in parallel mode. We don't want + * to switch that now, thus not propagating the creation. + */ + return false; + } + + case CREATE_OBJECT_PROPAGATION_AUTOMATIC: + { + /* + * When we run in optimistic mode we want to switch to sequential mode, only + * if this would _not_ give an error to the user. Meaning, we either are + * already in sequential mode (checked earlier), or there has been no parallel + * execution in the current transaction block. + * + * If switching to sequential would throw an error we would stay in parallel + * mode while creating new objects. We will rely on Citus' mechanism to ensure + * the existence if the object would be used in the same transaction. + */ + if (ParallelQueryExecutedInTransaction()) + { + return false; + } + + return true; + } + + case CREATE_OBJECT_PROPAGATION_IMMEDIATE: + { + return true; + } + + default: + { + elog(ERROR, "unsupported ddl propagation mode"); + } + } +} + + /* * ShouldPropagateObject determines if we should be propagating DDLs based * on their object address. diff --git a/src/backend/distributed/commands/extension.c b/src/backend/distributed/commands/extension.c index fac8a783a..f585b6a67 100644 --- a/src/backend/distributed/commands/extension.c +++ b/src/backend/distributed/commands/extension.c @@ -136,11 +136,8 @@ PostprocessCreateExtensionStmt(Node *node, const char *queryString) return NIL; } - /* - * If the extension command is a part of a multi-statement transaction, - * do not propagate it - */ - if (IsMultiStatementTransaction()) + /* check creation against multi-statement transaction policy */ + if (!ShouldPropagateCreateInCoordinatedTransction()) { return NIL; } diff --git a/src/backend/distributed/commands/foreign_server.c b/src/backend/distributed/commands/foreign_server.c index ad1802ddb..0777814df 100644 --- a/src/backend/distributed/commands/foreign_server.c +++ b/src/backend/distributed/commands/foreign_server.c @@ -17,6 +17,7 @@ #include "distributed/listutils.h" #include "distributed/metadata/distobject.h" #include "distributed/metadata_sync.h" +#include "distributed/multi_executor.h" #include "distributed/worker_transaction.h" #include "foreign/foreign.h" #include "nodes/makefuncs.h" @@ -41,7 +42,14 @@ PreprocessCreateForeignServerStmt(Node *node, const char *queryString, return NIL; } + /* check creation against multi-statement transaction policy */ + if (!ShouldPropagateCreateInCoordinatedTransction()) + { + return NIL; + } + EnsureCoordinator(); + EnsureSequentialMode(OBJECT_FOREIGN_SERVER); char *sql = DeparseTreeNode(node); @@ -209,7 +217,18 @@ PreprocessDropForeignServerStmt(Node *node, const char *queryString, List * PostprocessCreateForeignServerStmt(Node *node, const char *queryString) { - bool missingOk = false; + if (!ShouldPropagate()) + { + return NIL; + } + + /* check creation against multi-statement transaction policy */ + if (!ShouldPropagateCreateInCoordinatedTransction()) + { + return NIL; + } + + const bool missingOk = false; ObjectAddress address = GetObjectAddressFromParseTree(node, missingOk); EnsureDependenciesExistOnAllNodes(&address); @@ -224,8 +243,14 @@ PostprocessCreateForeignServerStmt(Node *node, const char *queryString) List * PostprocessAlterForeignServerOwnerStmt(Node *node, const char *queryString) { - bool missingOk = false; + const bool missingOk = false; ObjectAddress address = GetObjectAddressFromParseTree(node, missingOk); + + if (!ShouldPropagateObject(&address)) + { + return NIL; + } + EnsureDependenciesExistOnAllNodes(&address); return NIL; diff --git a/src/backend/distributed/commands/schema.c b/src/backend/distributed/commands/schema.c index 77f3fcc32..635e7adde 100644 --- a/src/backend/distributed/commands/schema.c +++ b/src/backend/distributed/commands/schema.c @@ -367,8 +367,8 @@ ShouldPropagateCreateSchemaStmt() return false; } - if (IsMultiStatementTransaction() && - MultiShardConnectionType != SEQUENTIAL_CONNECTION) + /* check creation against multi-statement transaction policy */ + if (!ShouldPropagateCreateInCoordinatedTransction()) { return false; } diff --git a/src/backend/distributed/commands/text_search.c b/src/backend/distributed/commands/text_search.c index be78057f7..53080c42b 100644 --- a/src/backend/distributed/commands/text_search.c +++ b/src/backend/distributed/commands/text_search.c @@ -73,16 +73,10 @@ PostprocessCreateTextSearchConfigurationStmt(Node *node, const char *queryString return NIL; } - /* - * If the create command is a part of a multi-statement transaction that is not in - * sequential mode, don't propagate. Instead we will rely on back filling. - */ - if (IsMultiStatementTransaction()) + /* check creation against multi-statement transaction policy */ + if (!ShouldPropagateCreateInCoordinatedTransction()) { - if (MultiShardConnectionType != SEQUENTIAL_CONNECTION) - { - return NIL; - } + return NIL; } EnsureCoordinator(); diff --git a/src/backend/distributed/commands/type.c b/src/backend/distributed/commands/type.c index 47dd0c307..c124388d4 100644 --- a/src/backend/distributed/commands/type.c +++ b/src/backend/distributed/commands/type.c @@ -1151,7 +1151,7 @@ ShouldPropagateTypeCreate() * this type will be used as a column in a table that will be created and distributed * in this same transaction. */ - if (IsMultiStatementTransaction()) + if (!ShouldPropagateCreateInCoordinatedTransction()) { return false; } diff --git a/src/backend/distributed/commands/utility_hook.c b/src/backend/distributed/commands/utility_hook.c index 1611da462..91e02a8ff 100644 --- a/src/backend/distributed/commands/utility_hook.c +++ b/src/backend/distributed/commands/utility_hook.c @@ -78,6 +78,7 @@ #include "utils/syscache.h" bool EnableDDLPropagation = true; /* ddl propagation is enabled */ +int CreateObjectPropagationMode = CREATE_OBJECT_PROPAGATION_DEFERRED; PropSetCmdBehavior PropagateSetCommands = PROPSETCMD_NONE; /* SET prop off */ static bool shouldInvalidateForeignKeyGraph = false; static int activeAlterTables = 0; diff --git a/src/backend/distributed/executor/multi_executor.c b/src/backend/distributed/executor/multi_executor.c index 28498e0f2..f43235aac 100644 --- a/src/backend/distributed/executor/multi_executor.c +++ b/src/backend/distributed/executor/multi_executor.c @@ -763,6 +763,11 @@ GetObjectTypeString(ObjectType objType) return "extension"; } + case OBJECT_FOREIGN_SERVER: + { + return "foreign server"; + } + case OBJECT_FUNCTION: { return "function"; diff --git a/src/backend/distributed/shared_library_init.c b/src/backend/distributed/shared_library_init.c index e6b98a843..7897cbefe 100644 --- a/src/backend/distributed/shared_library_init.c +++ b/src/backend/distributed/shared_library_init.c @@ -218,6 +218,13 @@ static const struct config_enum_entry explain_analyze_sort_method_options[] = { { NULL, 0, false } }; +static const struct config_enum_entry create_object_propagation_options[] = { + {"deferred", CREATE_OBJECT_PROPAGATION_DEFERRED, false}, + {"automatic", CREATE_OBJECT_PROPAGATION_AUTOMATIC, false}, + {"immediate", CREATE_OBJECT_PROPAGATION_IMMEDIATE, false}, + {NULL, 0, false} +}; + /* *INDENT-ON* */ @@ -669,6 +676,24 @@ RegisterCitusConfigVariables(void) GUC_STANDARD, NULL, NULL, NULL); + DefineCustomEnumVariable( + "citus.create_object_propagation", + gettext_noop("Controls the behavior of CREATE statements in transactions for " + "supported objects"), + gettext_noop("When creating new objects in transactions this setting is used to " + "determine the behavior for propagating. When objects are created " + "in a multi-statement transaction block Citus needs to switch to " + "sequential mode (if not already) to make sure the objects are " + "visible to later statements on shards. The switch to sequential is " + "not always desired. By changing this behavior the user can trade " + "off performance for full transactional consistency on the creation " + "of new objects."), + &CreateObjectPropagationMode, + CREATE_OBJECT_PROPAGATION_DEFERRED, create_object_propagation_options, + PGC_USERSET, + GUC_NO_SHOW_ALL, + NULL, NULL, NULL); + DefineCustomBoolVariable( "citus.defer_drop_after_shard_move", gettext_noop("When enabled a shard move will mark the original shards " diff --git a/src/include/distributed/commands/utility_hook.h b/src/include/distributed/commands/utility_hook.h index 7c926fcf1..615a7c6d2 100644 --- a/src/include/distributed/commands/utility_hook.h +++ b/src/include/distributed/commands/utility_hook.h @@ -22,6 +22,13 @@ #include "distributed/version_compat.h" #include "distributed/worker_transaction.h" +typedef enum +{ + CREATE_OBJECT_PROPAGATION_DEFERRED = 0, + CREATE_OBJECT_PROPAGATION_AUTOMATIC = 1, + CREATE_OBJECT_PROPAGATION_IMMEDIATE = 2 +} CreateObjectPropagationOptions; + typedef enum { PROPSETCMD_INVALID = -1, @@ -32,6 +39,7 @@ typedef enum } PropSetCmdBehavior; extern PropSetCmdBehavior PropagateSetCommands; extern bool EnableDDLPropagation; +extern int CreateObjectPropagationMode; extern bool EnableCreateTypePropagation; extern bool EnableAlterRolePropagation; extern bool EnableAlterRoleSetPropagation; diff --git a/src/include/distributed/metadata_utility.h b/src/include/distributed/metadata_utility.h index 75e76ec8d..c03b3abe7 100644 --- a/src/include/distributed/metadata_utility.h +++ b/src/include/distributed/metadata_utility.h @@ -251,6 +251,7 @@ extern TableConversionReturn * UndistributeTable(TableConversionParameters *para extern void EnsureDependenciesExistOnAllNodes(const ObjectAddress *target); extern List * GetDistributableDependenciesForObject(const ObjectAddress *target); extern bool ShouldPropagate(void); +extern bool ShouldPropagateCreateInCoordinatedTransction(void); extern bool ShouldPropagateObject(const ObjectAddress *address); extern List * ReplicateAllObjectsToNodeCommandList(const char *nodeName, int nodePort); diff --git a/src/test/regress/expected/distributed_types.out b/src/test/regress/expected/distributed_types.out index dcf8dd8b2..2e2ef9c1f 100644 --- a/src/test/regress/expected/distributed_types.out +++ b/src/test/regress/expected/distributed_types.out @@ -453,6 +453,107 @@ SELECT * FROM field_indirection_test_2 ORDER BY 1,2,3; 8 | (10," text10",20) | (40,50) (2 rows) +-- test different ddl propagation modes +SET citus.create_object_propagation TO deferred; +BEGIN; +CREATE TYPE deferred_type AS (a int); +SHOW citus.multi_shard_modify_mode; + citus.multi_shard_modify_mode +--------------------------------------------------------------------- + parallel +(1 row) + +CREATE TABLE deferred_table(a int,b deferred_type); +SELECT create_distributed_table('deferred_table', 'a'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +SHOW citus.multi_shard_modify_mode; + citus.multi_shard_modify_mode +--------------------------------------------------------------------- + parallel +(1 row) + +COMMIT; +SET citus.create_object_propagation TO automatic; +BEGIN; +CREATE TYPE automatic_type AS (a int); +SHOW citus.multi_shard_modify_mode; + citus.multi_shard_modify_mode +--------------------------------------------------------------------- + sequential +(1 row) + +CREATE TABLE automatic_table(a int,b automatic_type); +SELECT create_distributed_table('automatic_table', 'a'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +SHOW citus.multi_shard_modify_mode; + citus.multi_shard_modify_mode +--------------------------------------------------------------------- + sequential +(1 row) + +COMMIT; +SET citus.create_object_propagation TO automatic; +BEGIN; +-- force parallel execution by preceding with a analytical query +SET LOCAL citus.force_max_query_parallelization TO on; +SELECT count(*) FROM automatic_table; + count +--------------------------------------------------------------------- + 0 +(1 row) + +CREATE TYPE automatic2_type AS (a int); +SHOW citus.multi_shard_modify_mode; + citus.multi_shard_modify_mode +--------------------------------------------------------------------- + parallel +(1 row) + +CREATE TABLE automatic2_table(a int,b automatic2_type); +SELECT create_distributed_table('automatic2_table', 'a'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +SHOW citus.multi_shard_modify_mode; + citus.multi_shard_modify_mode +--------------------------------------------------------------------- + parallel +(1 row) + +COMMIT; +SET citus.create_object_propagation TO immediate; +BEGIN; +CREATE TYPE immediate_type AS (a int); +SHOW citus.multi_shard_modify_mode; + citus.multi_shard_modify_mode +--------------------------------------------------------------------- + sequential +(1 row) + +CREATE TABLE immediate_table(a int,b immediate_type); +SELECT create_distributed_table('immediate_table', 'a'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +SHOW citus.multi_shard_modify_mode; + citus.multi_shard_modify_mode +--------------------------------------------------------------------- + sequential +(1 row) + +COMMIT; -- clear objects SET client_min_messages TO error; -- suppress cascading objects dropping DROP SCHEMA type_tests CASCADE; diff --git a/src/test/regress/sql/distributed_types.sql b/src/test/regress/sql/distributed_types.sql index a136e2fa8..23d84d26b 100644 --- a/src/test/regress/sql/distributed_types.sql +++ b/src/test/regress/sql/distributed_types.sql @@ -281,6 +281,47 @@ UPDATE field_indirection_test_2 SET (ct2_col, ct1_col) = ('(10, "text10", 20)', SELECT * FROM field_indirection_test_2 ORDER BY 1,2,3; +-- test different ddl propagation modes +SET citus.create_object_propagation TO deferred; +BEGIN; +CREATE TYPE deferred_type AS (a int); +SHOW citus.multi_shard_modify_mode; +CREATE TABLE deferred_table(a int,b deferred_type); +SELECT create_distributed_table('deferred_table', 'a'); +SHOW citus.multi_shard_modify_mode; +COMMIT; + +SET citus.create_object_propagation TO automatic; +BEGIN; +CREATE TYPE automatic_type AS (a int); +SHOW citus.multi_shard_modify_mode; +CREATE TABLE automatic_table(a int,b automatic_type); +SELECT create_distributed_table('automatic_table', 'a'); +SHOW citus.multi_shard_modify_mode; +COMMIT; + +SET citus.create_object_propagation TO automatic; +BEGIN; +-- force parallel execution by preceding with a analytical query +SET LOCAL citus.force_max_query_parallelization TO on; +SELECT count(*) FROM automatic_table; + +CREATE TYPE automatic2_type AS (a int); +SHOW citus.multi_shard_modify_mode; +CREATE TABLE automatic2_table(a int,b automatic2_type); +SELECT create_distributed_table('automatic2_table', 'a'); +SHOW citus.multi_shard_modify_mode; +COMMIT; + +SET citus.create_object_propagation TO immediate; +BEGIN; +CREATE TYPE immediate_type AS (a int); +SHOW citus.multi_shard_modify_mode; +CREATE TABLE immediate_table(a int,b immediate_type); +SELECT create_distributed_table('immediate_table', 'a'); +SHOW citus.multi_shard_modify_mode; +COMMIT; + -- clear objects SET client_min_messages TO error; -- suppress cascading objects dropping DROP SCHEMA type_tests CASCADE;