diff --git a/src/backend/distributed/commands/collation.c b/src/backend/distributed/commands/collation.c index 7f047ec1d..12bf1404a 100644 --- a/src/backend/distributed/commands/collation.c +++ b/src/backend/distributed/commands/collation.c @@ -530,11 +530,14 @@ PreprocessDefineCollationStmt(Node *node, const char *queryString, { Assert(castNode(DefineStmt, node)->kind == OBJECT_COLLATION); - if (ShouldPropagateDefineCollationStmt()) + if (!ShouldPropagateDefineCollationStmt()) { - EnsureCoordinator(); + return NIL; } + EnsureCoordinator(); + EnsureSequentialMode(OBJECT_COLLATION); + return NIL; } @@ -575,8 +578,7 @@ PostprocessDefineCollationStmt(Node *node, const char *queryString) * ShouldPropagateDefineCollationStmt checks if collation define * statement should be propagated. Don't propagate if: * - metadata syncing if off - * - statement is part of a multi stmt transaction and the multi shard connection - * type is not sequential + * - create statement should be propagated according the the ddl propagation policy */ static bool ShouldPropagateDefineCollationStmt() @@ -586,8 +588,7 @@ ShouldPropagateDefineCollationStmt() return false; } - if (IsMultiStatementTransaction() && - MultiShardConnectionType != SEQUENTIAL_CONNECTION) + if (!ShouldPropagateCreateInCoordinatedTransction()) { return false; } diff --git a/src/backend/distributed/commands/dependencies.c b/src/backend/distributed/commands/dependencies.c index c1bd4c340..fe6e651fa 100644 --- a/src/backend/distributed/commands/dependencies.c +++ b/src/backend/distributed/commands/dependencies.c @@ -20,6 +20,8 @@ #include "distributed/metadata/dependency.h" #include "distributed/metadata/distobject.h" #include "distributed/metadata_sync.h" +#include "distributed/multi_executor.h" +#include "distributed/relation_access_tracking.h" #include "distributed/remote_commands.h" #include "distributed/worker_manager.h" #include "distributed/worker_transaction.h" @@ -459,6 +461,88 @@ ShouldPropagate(void) } +/* + * ShouldPropagateCreateInCoordinatedTransction returns based the current state of the + * session and policies if Citus needs to propagate the creation of new objects. + * + * Creation of objects on other nodes could be postponed till the object is actually used + * in a sharded object (eg. distributed table or index on a distributed table). In certain + * use cases the opportunity for parallelism in a transaction block is preferred. When + * configured like that the creation of an object might be postponed and backfilled till + * the object is actually used. + */ +bool +ShouldPropagateCreateInCoordinatedTransction() +{ + if (!IsMultiStatementTransaction()) + { + /* + * If we are in a single statement transaction we will always propagate the + * creation of objects. There are no downsides in regard to performance or + * transactional limitations. These only arise with transaction blocks consisting + * of multiple statements. + */ + return true; + } + + if (MultiShardConnectionType == SEQUENTIAL_CONNECTION) + { + /* + * If we are in a transaction that is already switched to sequential, either by + * the user, or automatically by an other command, we will always propagate the + * creation of new objects to the workers. + * + * This guarantees no strange anomalies when the transaction aborts or on + * visibility of the newly created object. + */ + return true; + } + + switch (CreateObjectPropagationMode) + { + case CREATE_OBJECT_PROPAGATION_DEFERRED: + { + /* + * We prefer parallelism at this point. Since we did not already return while + * checking for sequential mode we are still in parallel mode. We don't want + * to switch that now, thus not propagating the creation. + */ + return false; + } + + case CREATE_OBJECT_PROPAGATION_AUTOMATIC: + { + /* + * When we run in optimistic mode we want to switch to sequential mode, only + * if this would _not_ give an error to the user. Meaning, we either are + * already in sequential mode (checked earlier), or there has been no parallel + * execution in the current transaction block. + * + * If switching to sequential would throw an error we would stay in parallel + * mode while creating new objects. We will rely on Citus' mechanism to ensure + * the existence if the object would be used in the same transaction. + */ + if (ParallelQueryExecutedInTransaction()) + { + return false; + } + + return true; + } + + case CREATE_OBJECT_PROPAGATION_IMMEDIATE: + { + return true; + } + + default: + { + elog(ERROR, "unsupported ddl propagation mode"); + } + } +} + + /* * ShouldPropagateObject determines if we should be propagating DDLs based * on their object address. diff --git a/src/backend/distributed/commands/extension.c b/src/backend/distributed/commands/extension.c index fac8a783a..f585b6a67 100644 --- a/src/backend/distributed/commands/extension.c +++ b/src/backend/distributed/commands/extension.c @@ -136,11 +136,8 @@ PostprocessCreateExtensionStmt(Node *node, const char *queryString) return NIL; } - /* - * If the extension command is a part of a multi-statement transaction, - * do not propagate it - */ - if (IsMultiStatementTransaction()) + /* check creation against multi-statement transaction policy */ + if (!ShouldPropagateCreateInCoordinatedTransction()) { return NIL; } diff --git a/src/backend/distributed/commands/foreign_server.c b/src/backend/distributed/commands/foreign_server.c index ad1802ddb..0777814df 100644 --- a/src/backend/distributed/commands/foreign_server.c +++ b/src/backend/distributed/commands/foreign_server.c @@ -17,6 +17,7 @@ #include "distributed/listutils.h" #include "distributed/metadata/distobject.h" #include "distributed/metadata_sync.h" +#include "distributed/multi_executor.h" #include "distributed/worker_transaction.h" #include "foreign/foreign.h" #include "nodes/makefuncs.h" @@ -41,7 +42,14 @@ PreprocessCreateForeignServerStmt(Node *node, const char *queryString, return NIL; } + /* check creation against multi-statement transaction policy */ + if (!ShouldPropagateCreateInCoordinatedTransction()) + { + return NIL; + } + EnsureCoordinator(); + EnsureSequentialMode(OBJECT_FOREIGN_SERVER); char *sql = DeparseTreeNode(node); @@ -209,7 +217,18 @@ PreprocessDropForeignServerStmt(Node *node, const char *queryString, List * PostprocessCreateForeignServerStmt(Node *node, const char *queryString) { - bool missingOk = false; + if (!ShouldPropagate()) + { + return NIL; + } + + /* check creation against multi-statement transaction policy */ + if (!ShouldPropagateCreateInCoordinatedTransction()) + { + return NIL; + } + + const bool missingOk = false; ObjectAddress address = GetObjectAddressFromParseTree(node, missingOk); EnsureDependenciesExistOnAllNodes(&address); @@ -224,8 +243,14 @@ PostprocessCreateForeignServerStmt(Node *node, const char *queryString) List * PostprocessAlterForeignServerOwnerStmt(Node *node, const char *queryString) { - bool missingOk = false; + const bool missingOk = false; ObjectAddress address = GetObjectAddressFromParseTree(node, missingOk); + + if (!ShouldPropagateObject(&address)) + { + return NIL; + } + EnsureDependenciesExistOnAllNodes(&address); return NIL; diff --git a/src/backend/distributed/commands/schema.c b/src/backend/distributed/commands/schema.c index 77f3fcc32..635e7adde 100644 --- a/src/backend/distributed/commands/schema.c +++ b/src/backend/distributed/commands/schema.c @@ -367,8 +367,8 @@ ShouldPropagateCreateSchemaStmt() return false; } - if (IsMultiStatementTransaction() && - MultiShardConnectionType != SEQUENTIAL_CONNECTION) + /* check creation against multi-statement transaction policy */ + if (!ShouldPropagateCreateInCoordinatedTransction()) { return false; } diff --git a/src/backend/distributed/commands/text_search.c b/src/backend/distributed/commands/text_search.c index be78057f7..53080c42b 100644 --- a/src/backend/distributed/commands/text_search.c +++ b/src/backend/distributed/commands/text_search.c @@ -73,16 +73,10 @@ PostprocessCreateTextSearchConfigurationStmt(Node *node, const char *queryString return NIL; } - /* - * If the create command is a part of a multi-statement transaction that is not in - * sequential mode, don't propagate. Instead we will rely on back filling. - */ - if (IsMultiStatementTransaction()) + /* check creation against multi-statement transaction policy */ + if (!ShouldPropagateCreateInCoordinatedTransction()) { - if (MultiShardConnectionType != SEQUENTIAL_CONNECTION) - { - return NIL; - } + return NIL; } EnsureCoordinator(); diff --git a/src/backend/distributed/commands/type.c b/src/backend/distributed/commands/type.c index 47dd0c307..c124388d4 100644 --- a/src/backend/distributed/commands/type.c +++ b/src/backend/distributed/commands/type.c @@ -1151,7 +1151,7 @@ ShouldPropagateTypeCreate() * this type will be used as a column in a table that will be created and distributed * in this same transaction. */ - if (IsMultiStatementTransaction()) + if (!ShouldPropagateCreateInCoordinatedTransction()) { return false; } diff --git a/src/backend/distributed/commands/utility_hook.c b/src/backend/distributed/commands/utility_hook.c index 1611da462..91e02a8ff 100644 --- a/src/backend/distributed/commands/utility_hook.c +++ b/src/backend/distributed/commands/utility_hook.c @@ -78,6 +78,7 @@ #include "utils/syscache.h" bool EnableDDLPropagation = true; /* ddl propagation is enabled */ +int CreateObjectPropagationMode = CREATE_OBJECT_PROPAGATION_DEFERRED; PropSetCmdBehavior PropagateSetCommands = PROPSETCMD_NONE; /* SET prop off */ static bool shouldInvalidateForeignKeyGraph = false; static int activeAlterTables = 0; diff --git a/src/backend/distributed/executor/multi_executor.c b/src/backend/distributed/executor/multi_executor.c index 28498e0f2..f43235aac 100644 --- a/src/backend/distributed/executor/multi_executor.c +++ b/src/backend/distributed/executor/multi_executor.c @@ -763,6 +763,11 @@ GetObjectTypeString(ObjectType objType) return "extension"; } + case OBJECT_FOREIGN_SERVER: + { + return "foreign server"; + } + case OBJECT_FUNCTION: { return "function"; diff --git a/src/backend/distributed/shared_library_init.c b/src/backend/distributed/shared_library_init.c index e6b98a843..7897cbefe 100644 --- a/src/backend/distributed/shared_library_init.c +++ b/src/backend/distributed/shared_library_init.c @@ -218,6 +218,13 @@ static const struct config_enum_entry explain_analyze_sort_method_options[] = { { NULL, 0, false } }; +static const struct config_enum_entry create_object_propagation_options[] = { + {"deferred", CREATE_OBJECT_PROPAGATION_DEFERRED, false}, + {"automatic", CREATE_OBJECT_PROPAGATION_AUTOMATIC, false}, + {"immediate", CREATE_OBJECT_PROPAGATION_IMMEDIATE, false}, + {NULL, 0, false} +}; + /* *INDENT-ON* */ @@ -669,6 +676,24 @@ RegisterCitusConfigVariables(void) GUC_STANDARD, NULL, NULL, NULL); + DefineCustomEnumVariable( + "citus.create_object_propagation", + gettext_noop("Controls the behavior of CREATE statements in transactions for " + "supported objects"), + gettext_noop("When creating new objects in transactions this setting is used to " + "determine the behavior for propagating. When objects are created " + "in a multi-statement transaction block Citus needs to switch to " + "sequential mode (if not already) to make sure the objects are " + "visible to later statements on shards. The switch to sequential is " + "not always desired. By changing this behavior the user can trade " + "off performance for full transactional consistency on the creation " + "of new objects."), + &CreateObjectPropagationMode, + CREATE_OBJECT_PROPAGATION_DEFERRED, create_object_propagation_options, + PGC_USERSET, + GUC_NO_SHOW_ALL, + NULL, NULL, NULL); + DefineCustomBoolVariable( "citus.defer_drop_after_shard_move", gettext_noop("When enabled a shard move will mark the original shards " diff --git a/src/include/distributed/commands/utility_hook.h b/src/include/distributed/commands/utility_hook.h index 7c926fcf1..615a7c6d2 100644 --- a/src/include/distributed/commands/utility_hook.h +++ b/src/include/distributed/commands/utility_hook.h @@ -22,6 +22,13 @@ #include "distributed/version_compat.h" #include "distributed/worker_transaction.h" +typedef enum +{ + CREATE_OBJECT_PROPAGATION_DEFERRED = 0, + CREATE_OBJECT_PROPAGATION_AUTOMATIC = 1, + CREATE_OBJECT_PROPAGATION_IMMEDIATE = 2 +} CreateObjectPropagationOptions; + typedef enum { PROPSETCMD_INVALID = -1, @@ -32,6 +39,7 @@ typedef enum } PropSetCmdBehavior; extern PropSetCmdBehavior PropagateSetCommands; extern bool EnableDDLPropagation; +extern int CreateObjectPropagationMode; extern bool EnableCreateTypePropagation; extern bool EnableAlterRolePropagation; extern bool EnableAlterRoleSetPropagation; diff --git a/src/include/distributed/metadata_utility.h b/src/include/distributed/metadata_utility.h index 75e76ec8d..c03b3abe7 100644 --- a/src/include/distributed/metadata_utility.h +++ b/src/include/distributed/metadata_utility.h @@ -251,6 +251,7 @@ extern TableConversionReturn * UndistributeTable(TableConversionParameters *para extern void EnsureDependenciesExistOnAllNodes(const ObjectAddress *target); extern List * GetDistributableDependenciesForObject(const ObjectAddress *target); extern bool ShouldPropagate(void); +extern bool ShouldPropagateCreateInCoordinatedTransction(void); extern bool ShouldPropagateObject(const ObjectAddress *address); extern List * ReplicateAllObjectsToNodeCommandList(const char *nodeName, int nodePort); diff --git a/src/test/regress/expected/distributed_types.out b/src/test/regress/expected/distributed_types.out index dcf8dd8b2..2e2ef9c1f 100644 --- a/src/test/regress/expected/distributed_types.out +++ b/src/test/regress/expected/distributed_types.out @@ -453,6 +453,107 @@ SELECT * FROM field_indirection_test_2 ORDER BY 1,2,3; 8 | (10," text10",20) | (40,50) (2 rows) +-- test different ddl propagation modes +SET citus.create_object_propagation TO deferred; +BEGIN; +CREATE TYPE deferred_type AS (a int); +SHOW citus.multi_shard_modify_mode; + citus.multi_shard_modify_mode +--------------------------------------------------------------------- + parallel +(1 row) + +CREATE TABLE deferred_table(a int,b deferred_type); +SELECT create_distributed_table('deferred_table', 'a'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +SHOW citus.multi_shard_modify_mode; + citus.multi_shard_modify_mode +--------------------------------------------------------------------- + parallel +(1 row) + +COMMIT; +SET citus.create_object_propagation TO automatic; +BEGIN; +CREATE TYPE automatic_type AS (a int); +SHOW citus.multi_shard_modify_mode; + citus.multi_shard_modify_mode +--------------------------------------------------------------------- + sequential +(1 row) + +CREATE TABLE automatic_table(a int,b automatic_type); +SELECT create_distributed_table('automatic_table', 'a'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +SHOW citus.multi_shard_modify_mode; + citus.multi_shard_modify_mode +--------------------------------------------------------------------- + sequential +(1 row) + +COMMIT; +SET citus.create_object_propagation TO automatic; +BEGIN; +-- force parallel execution by preceding with a analytical query +SET LOCAL citus.force_max_query_parallelization TO on; +SELECT count(*) FROM automatic_table; + count +--------------------------------------------------------------------- + 0 +(1 row) + +CREATE TYPE automatic2_type AS (a int); +SHOW citus.multi_shard_modify_mode; + citus.multi_shard_modify_mode +--------------------------------------------------------------------- + parallel +(1 row) + +CREATE TABLE automatic2_table(a int,b automatic2_type); +SELECT create_distributed_table('automatic2_table', 'a'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +SHOW citus.multi_shard_modify_mode; + citus.multi_shard_modify_mode +--------------------------------------------------------------------- + parallel +(1 row) + +COMMIT; +SET citus.create_object_propagation TO immediate; +BEGIN; +CREATE TYPE immediate_type AS (a int); +SHOW citus.multi_shard_modify_mode; + citus.multi_shard_modify_mode +--------------------------------------------------------------------- + sequential +(1 row) + +CREATE TABLE immediate_table(a int,b immediate_type); +SELECT create_distributed_table('immediate_table', 'a'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +SHOW citus.multi_shard_modify_mode; + citus.multi_shard_modify_mode +--------------------------------------------------------------------- + sequential +(1 row) + +COMMIT; -- clear objects SET client_min_messages TO error; -- suppress cascading objects dropping DROP SCHEMA type_tests CASCADE; diff --git a/src/test/regress/sql/distributed_types.sql b/src/test/regress/sql/distributed_types.sql index a136e2fa8..23d84d26b 100644 --- a/src/test/regress/sql/distributed_types.sql +++ b/src/test/regress/sql/distributed_types.sql @@ -281,6 +281,47 @@ UPDATE field_indirection_test_2 SET (ct2_col, ct1_col) = ('(10, "text10", 20)', SELECT * FROM field_indirection_test_2 ORDER BY 1,2,3; +-- test different ddl propagation modes +SET citus.create_object_propagation TO deferred; +BEGIN; +CREATE TYPE deferred_type AS (a int); +SHOW citus.multi_shard_modify_mode; +CREATE TABLE deferred_table(a int,b deferred_type); +SELECT create_distributed_table('deferred_table', 'a'); +SHOW citus.multi_shard_modify_mode; +COMMIT; + +SET citus.create_object_propagation TO automatic; +BEGIN; +CREATE TYPE automatic_type AS (a int); +SHOW citus.multi_shard_modify_mode; +CREATE TABLE automatic_table(a int,b automatic_type); +SELECT create_distributed_table('automatic_table', 'a'); +SHOW citus.multi_shard_modify_mode; +COMMIT; + +SET citus.create_object_propagation TO automatic; +BEGIN; +-- force parallel execution by preceding with a analytical query +SET LOCAL citus.force_max_query_parallelization TO on; +SELECT count(*) FROM automatic_table; + +CREATE TYPE automatic2_type AS (a int); +SHOW citus.multi_shard_modify_mode; +CREATE TABLE automatic2_table(a int,b automatic2_type); +SELECT create_distributed_table('automatic2_table', 'a'); +SHOW citus.multi_shard_modify_mode; +COMMIT; + +SET citus.create_object_propagation TO immediate; +BEGIN; +CREATE TYPE immediate_type AS (a int); +SHOW citus.multi_shard_modify_mode; +CREATE TABLE immediate_table(a int,b immediate_type); +SELECT create_distributed_table('immediate_table', 'a'); +SHOW citus.multi_shard_modify_mode; +COMMIT; + -- clear objects SET client_min_messages TO error; -- suppress cascading objects dropping DROP SCHEMA type_tests CASCADE;