From 0411a98c99f5bfe775a3c89e29c5863d2aa861d1 Mon Sep 17 00:00:00 2001 From: Ahmet Gedemenli Date: Mon, 14 Feb 2022 18:38:21 +0300 Subject: [PATCH] Refactor EnsureSequentialMode functions (#5704) --- src/backend/distributed/commands/collation.c | 50 +--------- src/backend/distributed/commands/database.c | 39 +------- src/backend/distributed/commands/extension.c | 47 +--------- src/backend/distributed/commands/function.c | 51 ++-------- src/backend/distributed/commands/schema.c | 45 +-------- src/backend/distributed/commands/type.c | 60 ++---------- .../distributed/executor/multi_executor.c | 94 +++++++++++++++++++ src/include/distributed/multi_executor.h | 1 + .../regress/expected/alter_database_owner.out | 4 +- .../expected/forcedelegation_functions.out | 34 +++---- .../expected/local_dist_join_mixed.out | 2 + .../expected/multi_function_in_join.out | 2 +- .../expected/multi_function_in_join_0.out | 2 +- .../expected/multi_mx_add_coordinator.out | 2 +- src/test/regress/expected/multi_mx_call.out | 6 +- src/test/regress/expected/multi_mx_call_0.out | 6 +- .../multi_mx_function_call_delegation.out | 10 +- .../multi_mx_function_call_delegation_0.out | 10 +- .../regress/expected/multi_schema_support.out | 4 +- .../expected/non_colocated_subquery_joins.out | 2 + 20 files changed, 167 insertions(+), 304 deletions(-) diff --git a/src/backend/distributed/commands/collation.c b/src/backend/distributed/commands/collation.c index ddbe63f52..d712c18ab 100644 --- a/src/backend/distributed/commands/collation.c +++ b/src/backend/distributed/commands/collation.c @@ -37,7 +37,6 @@ static char * CreateCollationDDLInternal(Oid collationId, Oid *collowner, char **quotedCollationName); static List * FilterNameListForDistributedCollations(List *objects, bool missing_ok, List **addresses); -static void EnsureSequentialModeForCollationDDL(void); /* @@ -256,7 +255,7 @@ PreprocessDropCollationStmt(Node *node, const char *queryString, char *dropStmtSql = DeparseTreeNode((Node *) stmt); stmt->objects = oldCollations; - EnsureSequentialModeForCollationDDL(); + EnsureSequentialMode(OBJECT_COLLATION); /* to prevent recursion with mx we disable ddl propagation */ List *commands = list_make3(DISABLE_DDL_PROPAGATION, @@ -292,7 +291,7 @@ PreprocessAlterCollationOwnerStmt(Node *node, const char *queryString, QualifyTreeNode((Node *) stmt); char *sql = DeparseTreeNode((Node *) stmt); - EnsureSequentialModeForCollationDDL(); + EnsureSequentialMode(OBJECT_COLLATION); List *commands = list_make3(DISABLE_DDL_PROPAGATION, (void *) sql, ENABLE_DDL_PROPAGATION); @@ -328,7 +327,7 @@ PreprocessRenameCollationStmt(Node *node, const char *queryString, /* deparse sql*/ char *renameStmtSql = DeparseTreeNode((Node *) stmt); - EnsureSequentialModeForCollationDDL(); + EnsureSequentialMode(OBJECT_COLLATION); /* to prevent recursion with mx we disable ddl propagation */ List *commands = list_make3(DISABLE_DDL_PROPAGATION, @@ -363,7 +362,7 @@ PreprocessAlterCollationSchemaStmt(Node *node, const char *queryString, QualifyTreeNode((Node *) stmt); char *sql = DeparseTreeNode((Node *) stmt); - EnsureSequentialModeForCollationDDL(); + EnsureSequentialMode(OBJECT_COLLATION); List *commands = list_make3(DISABLE_DDL_PROPAGATION, (void *) sql, @@ -453,47 +452,6 @@ AlterCollationSchemaStmtObjectAddress(Node *node, bool missing_ok) } -/* - * EnsureSequentialModeForCollationDDL makes sure that the current transaction is already in - * sequential mode, or can still safely be put in sequential mode, it errors if that is - * not possible. The error contains information for the user to retry the transaction with - * sequential mode set from the beginning. - * - * As collations are node scoped objects there exists only 1 instance of the collation used by - * potentially multiple shards. To make sure all shards in the transaction can interact - * with the type the type needs to be visible on all connections used by the transaction, - * meaning we can only use 1 connection per node. - */ -static void -EnsureSequentialModeForCollationDDL(void) -{ - if (!IsTransactionBlock()) - { - /* we do not need to switch to sequential mode if we are not in a transaction */ - return; - } - - if (ParallelQueryExecutedInTransaction()) - { - ereport(ERROR, (errmsg("cannot create or modify collation because there was a " - "parallel operation on a distributed table in the " - "transaction"), - errdetail("When creating or altering a collation, Citus needs to " - "perform all operations over a single connection per " - "node to ensure consistency."), - errhint("Try re-running the transaction with " - "\"SET LOCAL citus.multi_shard_modify_mode TO " - "\'sequential\';\""))); - } - - ereport(DEBUG1, (errmsg("switching to sequential query execution mode"), - errdetail("Collation is created or altered. To make sure subsequent " - "commands see the collation correctly we need to make sure to " - "use only one connection for all future commands"))); - SetLocalMultiShardModifyModeToSequential(); -} - - /* * GenerateBackupNameForCollationCollision generates a new collation name for an existing collation. * The name is generated in such a way that the new name doesn't overlap with an existing collation diff --git a/src/backend/distributed/commands/database.c b/src/backend/distributed/commands/database.c index b4fb15110..59902b038 100644 --- a/src/backend/distributed/commands/database.c +++ b/src/backend/distributed/commands/database.c @@ -29,7 +29,6 @@ #include "distributed/relation_access_tracking.h" #include "distributed/worker_transaction.h" -static void EnsureSequentialModeForDatabaseDDL(void); static AlterOwnerStmt * RecreateAlterDatabaseOwnerStmt(Oid databaseOid); static Oid get_database_owner(Oid db_oid); @@ -66,7 +65,7 @@ PreprocessAlterDatabaseOwnerStmt(Node *node, const char *queryString, QualifyTreeNode((Node *) stmt); const char *sql = DeparseTreeNode((Node *) stmt); - EnsureSequentialModeForDatabaseDDL(); + EnsureSequentialMode(OBJECT_DATABASE); List *commands = list_make3(DISABLE_DDL_PROPAGATION, (void *) sql, ENABLE_DDL_PROPAGATION); @@ -177,39 +176,3 @@ get_database_owner(Oid db_oid) return dba; } - - -/* - * EnsureSequentialModeForDatabaseDDL makes sure that the current transaction is already - * in sequential mode, or can still safely be put in sequential mode, it errors if that is - * not possible. The error contains information for the user to retry the transaction with - * sequential mode set from the beginning. - */ -static void -EnsureSequentialModeForDatabaseDDL(void) -{ - if (!IsTransactionBlock()) - { - /* we do not need to switch to sequential mode if we are not in a transaction */ - return; - } - - if (ParallelQueryExecutedInTransaction()) - { - ereport(ERROR, (errmsg("cannot create or modify database because there was a " - "parallel operation on a distributed table in the " - "transaction"), - errdetail("When creating or altering a database, Citus needs to " - "perform all operations over a single connection per " - "node to ensure consistency."), - errhint("Try re-running the transaction with " - "\"SET LOCAL citus.multi_shard_modify_mode TO " - "\'sequential\';\""))); - } - - ereport(DEBUG1, (errmsg("switching to sequential query execution mode"), - errdetail("Database is created or altered. To make sure subsequent " - "commands see the type correctly we need to make sure to " - "use only one connection for all future commands"))); - SetLocalMultiShardModifyModeToSequential(); -} diff --git a/src/backend/distributed/commands/extension.c b/src/backend/distributed/commands/extension.c index ef5b6c1b1..dc1363a65 100644 --- a/src/backend/distributed/commands/extension.c +++ b/src/backend/distributed/commands/extension.c @@ -37,7 +37,6 @@ static void AddSchemaFieldIfMissing(CreateExtensionStmt *stmt); static List * FilterDistributedExtensions(List *extensionObjectList); static List * ExtensionNameListToObjectAddressList(List *extensionObjectList); static void MarkExistingObjectDependenciesDistributedIfSupported(void); -static void EnsureSequentialModeForExtensionDDL(void); static bool ShouldPropagateExtensionCommand(Node *parseTree); static bool IsAlterExtensionSetSchemaCitus(Node *parseTree); static Node * RecreateExtensionStmt(Oid extensionOid); @@ -163,7 +162,7 @@ PostprocessCreateExtensionStmt(Node *node, const char *queryString) * Make sure that the current transaction is already in sequential mode, * or can still safely be put in sequential mode */ - EnsureSequentialModeForExtensionDDL(); + EnsureSequentialMode(OBJECT_EXTENSION); /* * Here we append "schema" field to the "options" list (if not specified) @@ -274,7 +273,7 @@ PreprocessDropExtensionStmt(Node *node, const char *queryString, * Make sure that the current transaction is already in sequential mode, * or can still safely be put in sequential mode */ - EnsureSequentialModeForExtensionDDL(); + EnsureSequentialMode(OBJECT_EXTENSION); List *distributedExtensionAddresses = ExtensionNameListToObjectAddressList( distributedExtensions); @@ -409,7 +408,7 @@ PreprocessAlterExtensionSchemaStmt(Node *node, const char *queryString, * Make sure that the current transaction is already in sequential mode, * or can still safely be put in sequential mode */ - EnsureSequentialModeForExtensionDDL(); + EnsureSequentialMode(OBJECT_EXTENSION); const char *alterExtensionStmtSql = DeparseTreeNode(node); @@ -478,7 +477,7 @@ PreprocessAlterExtensionUpdateStmt(Node *node, const char *queryString, * Make sure that the current transaction is already in sequential mode, * or can still safely be put in sequential mode */ - EnsureSequentialModeForExtensionDDL(); + EnsureSequentialMode(OBJECT_EXTENSION); const char *alterExtensionStmtSql = DeparseTreeNode((Node *) alterExtensionStmt); @@ -603,44 +602,6 @@ PreprocessAlterExtensionContentsStmt(Node *node, const char *queryString, } -/* - * EnsureSequentialModeForExtensionDDL makes sure that the current transaction is already in - * sequential mode, or can still safely be put in sequential mode, it errors if that is - * not possible. The error contains information for the user to retry the transaction with - * sequential mode set from the beginning. - * - * As extensions are node scoped objects there exists only 1 instance of the - * extension used by potentially multiple shards. To make sure all shards in - * the transaction can interact with the extension the extension needs to be - * visible on all connections used by the transaction, meaning we can only use - * 1 connection per node. - */ -static void -EnsureSequentialModeForExtensionDDL(void) -{ - if (ParallelQueryExecutedInTransaction()) - { - ereport(ERROR, (errmsg("cannot run extension command because there was a " - "parallel operation on a distributed table in the " - "transaction"), - errdetail( - "When running command on/for a distributed extension, Citus needs to " - "perform all operations over a single connection per " - "node to ensure consistency."), - errhint("Try re-running the transaction with " - "\"SET LOCAL citus.multi_shard_modify_mode TO " - "\'sequential\';\""))); - } - - ereport(DEBUG1, (errmsg("switching to sequential query execution mode"), - errdetail( - "A command for a distributed extension is run. To make sure subsequent " - "commands see the type correctly we need to make sure to " - "use only one connection for all future commands"))); - SetLocalMultiShardModifyModeToSequential(); -} - - /* * ShouldPropagateExtensionCommand determines whether to propagate an extension * command to the worker nodes. diff --git a/src/backend/distributed/commands/function.c b/src/backend/distributed/commands/function.c index ab03fe77a..95b11fd0c 100644 --- a/src/backend/distributed/commands/function.c +++ b/src/backend/distributed/commands/function.c @@ -77,7 +77,6 @@ static int GetFunctionColocationId(Oid functionOid, char *colocateWithName, Oid static void EnsureFunctionCanBeColocatedWithTable(Oid functionOid, Oid distributionColumnType, Oid sourceRelationId); -static void EnsureSequentialModeForFunctionDDL(void); static bool ShouldPropagateCreateFunction(CreateFunctionStmt *stmt); static bool ShouldPropagateAlterFunction(const ObjectAddress *address); static bool ShouldAddFunctionSignature(FunctionParameterMode mode); @@ -207,7 +206,7 @@ create_distributed_function(PG_FUNCTION_ARGS) * when we allow propagation within a transaction block we should make sure * to only allow this in sequential mode. */ - EnsureSequentialModeForFunctionDDL(); + EnsureSequentialMode(OBJECT_FUNCTION); EnsureDependenciesExistOnAllNodes(&functionAddress); @@ -1169,42 +1168,6 @@ GetAggregateDDLCommand(const RegProcedure funcOid, bool useCreateOrReplace) } -/* - * EnsureSequentialModeForFunctionDDL makes sure that the current transaction is already in - * sequential mode, or can still safely be put in sequential mode, it errors if that is - * not possible. The error contains information for the user to retry the transaction with - * sequential mode set from the beginning. - * - * As functions are node scoped objects there exists only 1 instance of the function used by - * potentially multiple shards. To make sure all shards in the transaction can interact - * with the function the function needs to be visible on all connections used by the transaction, - * meaning we can only use 1 connection per node. - */ -static void -EnsureSequentialModeForFunctionDDL(void) -{ - if (ParallelQueryExecutedInTransaction()) - { - ereport(ERROR, (errmsg("cannot create function because there was a " - "parallel operation on a distributed table in the " - "transaction"), - errdetail("When creating a distributed function, Citus needs to " - "perform all operations over a single connection per " - "node to ensure consistency."), - errhint("Try re-running the transaction with " - "\"SET LOCAL citus.multi_shard_modify_mode TO " - "\'sequential\';\""))); - } - - ereport(DEBUG1, (errmsg("switching to sequential query execution mode"), - errdetail( - "A distributed function is created. To make sure subsequent " - "commands see the type correctly we need to make sure to " - "use only one connection for all future commands"))); - SetLocalMultiShardModifyModeToSequential(); -} - - /* * ShouldPropagateCreateFunction tests if we need to propagate a CREATE FUNCTION * statement. We only propagate replace's of distributed functions to keep the function on @@ -1313,7 +1276,7 @@ PreprocessCreateFunctionStmt(Node *node, const char *queryString, EnsureCoordinator(); - EnsureSequentialModeForFunctionDDL(); + EnsureSequentialMode(OBJECT_FUNCTION); /* * ddl jobs will be generated during the Processing phase as we need the function to @@ -1432,7 +1395,7 @@ PreprocessAlterFunctionStmt(Node *node, const char *queryString, EnsureCoordinator(); ErrorIfUnsupportedAlterFunctionStmt(stmt); - EnsureSequentialModeForFunctionDDL(); + EnsureSequentialMode(OBJECT_FUNCTION); QualifyTreeNode((Node *) stmt); const char *sql = DeparseTreeNode((Node *) stmt); @@ -1466,7 +1429,7 @@ PreprocessRenameFunctionStmt(Node *node, const char *queryString, } EnsureCoordinator(); - EnsureSequentialModeForFunctionDDL(); + EnsureSequentialMode(OBJECT_FUNCTION); QualifyTreeNode((Node *) stmt); const char *sql = DeparseTreeNode((Node *) stmt); @@ -1498,7 +1461,7 @@ PreprocessAlterFunctionSchemaStmt(Node *node, const char *queryString, } EnsureCoordinator(); - EnsureSequentialModeForFunctionDDL(); + EnsureSequentialMode(OBJECT_FUNCTION); QualifyTreeNode((Node *) stmt); const char *sql = DeparseTreeNode((Node *) stmt); @@ -1531,7 +1494,7 @@ PreprocessAlterFunctionOwnerStmt(Node *node, const char *queryString, } EnsureCoordinator(); - EnsureSequentialModeForFunctionDDL(); + EnsureSequentialMode(OBJECT_FUNCTION); QualifyTreeNode((Node *) stmt); const char *sql = DeparseTreeNode((Node *) stmt); @@ -1621,7 +1584,7 @@ PreprocessDropFunctionStmt(Node *node, const char *queryString, * types, so we block the call. */ EnsureCoordinator(); - EnsureSequentialModeForFunctionDDL(); + EnsureSequentialMode(OBJECT_FUNCTION); /* remove the entries for the distributed objects on dropping */ ObjectAddress *address = NULL; diff --git a/src/backend/distributed/commands/schema.c b/src/backend/distributed/commands/schema.c index adb26b5e7..e0c0f2b11 100644 --- a/src/backend/distributed/commands/schema.c +++ b/src/backend/distributed/commands/schema.c @@ -42,7 +42,6 @@ static ObjectAddress GetObjectAddressBySchemaName(char *schemaName, bool missing_ok); static List * FilterDistributedSchemas(List *schemas); -static void EnsureSequentialModeForSchemaDDL(void); static bool SchemaHasDistributedTableWithFKey(char *schemaName); static bool ShouldPropagateCreateSchemaStmt(void); @@ -62,7 +61,7 @@ PreprocessCreateSchemaStmt(Node *node, const char *queryString, EnsureCoordinator(); - EnsureSequentialModeForSchemaDDL(); + EnsureSequentialMode(OBJECT_SCHEMA); /* deparse sql*/ const char *sql = DeparseTreeNode(node); @@ -101,7 +100,7 @@ PreprocessDropSchemaStmt(Node *node, const char *queryString, return NIL; } - EnsureSequentialModeForSchemaDDL(); + EnsureSequentialMode(OBJECT_SCHEMA); Value *schemaVal = NULL; foreach_ptr(schemaVal, distributedSchemas) @@ -204,7 +203,7 @@ PreprocessAlterSchemaRenameStmt(Node *node, const char *queryString, /* deparse sql*/ const char *renameStmtSql = DeparseTreeNode(node); - EnsureSequentialModeForSchemaDDL(); + EnsureSequentialMode(OBJECT_SCHEMA); /* to prevent recursion with mx we disable ddl propagation */ List *commands = list_make3(DISABLE_DDL_PROPAGATION, @@ -293,44 +292,6 @@ FilterDistributedSchemas(List *schemas) } -/* - * EnsureSequentialModeForSchemaDDL makes sure that the current transaction is already in - * sequential mode, or can still safely be put in sequential mode, it errors if that is - * not possible. The error contains information for the user to retry the transaction with - * sequential mode set from the beginning. - * - * Copy-pasted from type.c - */ -static void -EnsureSequentialModeForSchemaDDL(void) -{ - if (!IsTransactionBlock()) - { - /* we do not need to switch to sequential mode if we are not in a transaction */ - return; - } - - if (ParallelQueryExecutedInTransaction()) - { - ereport(ERROR, (errmsg("cannot create or modify schema because there was a " - "parallel operation on a distributed table in the " - "transaction"), - errdetail("When creating, altering, or dropping a schema, Citus " - "needs to perform all operations over a single " - "connection per node to ensure consistency."), - errhint("Try re-running the transaction with " - "\"SET LOCAL citus.multi_shard_modify_mode TO " - "\'sequential\';\""))); - } - - ereport(DEBUG1, (errmsg("switching to sequential query execution mode"), - errdetail("Schema is created or altered. To make sure subsequent " - "commands see the schema correctly we need to make sure to " - "use only one connection for all future commands"))); - SetLocalMultiShardModifyModeToSequential(); -} - - /* * SchemaHasDistributedTableWithFKey takes a schema name and scans the relations within * that schema. If any one of the relations has a foreign key relationship, it returns diff --git a/src/backend/distributed/commands/type.c b/src/backend/distributed/commands/type.c index 78b56c2ff..524124285 100644 --- a/src/backend/distributed/commands/type.c +++ b/src/backend/distributed/commands/type.c @@ -92,7 +92,6 @@ bool EnableCreateTypePropagation = true; static List * FilterNameListForDistributedTypes(List *objects, bool missing_ok); static List * TypeNameListToObjectAddresses(List *objects); static TypeName * MakeTypeNameFromRangeVar(const RangeVar *relation); -static void EnsureSequentialModeForTypeDDL(void); static Oid GetTypeOwner(Oid typeOid); /* recreate functions */ @@ -158,7 +157,7 @@ PreprocessCompositeTypeStmt(Node *node, const char *queryString, * when we allow propagation within a transaction block we should make sure to only * allow this in sequential mode */ - EnsureSequentialModeForTypeDDL(); + EnsureSequentialMode(OBJECT_TYPE); List *commands = list_make3(DISABLE_DDL_PROPAGATION, (void *) compositeTypeStmtSql, @@ -223,7 +222,7 @@ PreprocessAlterTypeStmt(Node *node, const char *queryString, * regardless if in a transaction or not. If we would not propagate the alter * statement the types would be different on worker and coordinator. */ - EnsureSequentialModeForTypeDDL(); + EnsureSequentialMode(OBJECT_TYPE); List *commands = list_make3(DISABLE_DDL_PROPAGATION, (void *) alterTypeStmtSql, @@ -266,7 +265,7 @@ PreprocessCreateEnumStmt(Node *node, const char *queryString, * when we allow propagation within a transaction block we should make sure to only * allow this in sequential mode */ - EnsureSequentialModeForTypeDDL(); + EnsureSequentialMode(OBJECT_TYPE); /* to prevent recursion with mx we disable ddl propagation */ List *commands = list_make3(DISABLE_DDL_PROPAGATION, @@ -325,7 +324,7 @@ PreprocessAlterEnumStmt(Node *node, const char *queryString, * (adding values to an enum can not run in a transaction anyway and would error by * postgres already). */ - EnsureSequentialModeForTypeDDL(); + EnsureSequentialMode(OBJECT_TYPE); /* * managing types can only be done on the coordinator if ddl propagation is on. when @@ -405,7 +404,7 @@ PreprocessDropTypeStmt(Node *node, const char *queryString, char *dropStmtSql = DeparseTreeNode((Node *) stmt); stmt->objects = oldTypes; - EnsureSequentialModeForTypeDDL(); + EnsureSequentialMode(OBJECT_TYPE); /* to prevent recursion with mx we disable ddl propagation */ List *commands = list_make3(DISABLE_DDL_PROPAGATION, @@ -442,7 +441,7 @@ PreprocessRenameTypeStmt(Node *node, const char *queryString, /* deparse sql*/ const char *renameStmtSql = DeparseTreeNode(node); - EnsureSequentialModeForTypeDDL(); + EnsureSequentialMode(OBJECT_TYPE); /* to prevent recursion with mx we disable ddl propagation */ List *commands = list_make3(DISABLE_DDL_PROPAGATION, @@ -480,7 +479,7 @@ PreprocessRenameTypeAttributeStmt(Node *node, const char *queryString, const char *sql = DeparseTreeNode((Node *) stmt); - EnsureSequentialModeForTypeDDL(); + EnsureSequentialMode(OBJECT_TYPE); List *commands = list_make3(DISABLE_DDL_PROPAGATION, (void *) sql, ENABLE_DDL_PROPAGATION); @@ -513,7 +512,7 @@ PreprocessAlterTypeSchemaStmt(Node *node, const char *queryString, QualifyTreeNode((Node *) stmt); const char *sql = DeparseTreeNode((Node *) stmt); - EnsureSequentialModeForTypeDDL(); + EnsureSequentialMode(OBJECT_TYPE); List *commands = list_make3(DISABLE_DDL_PROPAGATION, (void *) sql, @@ -572,7 +571,7 @@ PreprocessAlterTypeOwnerStmt(Node *node, const char *queryString, QualifyTreeNode((Node *) stmt); const char *sql = DeparseTreeNode((Node *) stmt); - EnsureSequentialModeForTypeDDL(); + EnsureSequentialMode(OBJECT_TYPE); List *commands = list_make3(DISABLE_DDL_PROPAGATION, (void *) sql, ENABLE_DDL_PROPAGATION); @@ -1116,47 +1115,6 @@ MakeTypeNameFromRangeVar(const RangeVar *relation) } -/* - * EnsureSequentialModeForTypeDDL makes sure that the current transaction is already in - * sequential mode, or can still safely be put in sequential mode, it errors if that is - * not possible. The error contains information for the user to retry the transaction with - * sequential mode set from the beginning. - * - * As types are node scoped objects there exists only 1 instance of the type used by - * potentially multiple shards. To make sure all shards in the transaction can interact - * with the type the type needs to be visible on all connections used by the transaction, - * meaning we can only use 1 connection per node. - */ -static void -EnsureSequentialModeForTypeDDL(void) -{ - if (!IsTransactionBlock()) - { - /* we do not need to switch to sequential mode if we are not in a transaction */ - return; - } - - if (ParallelQueryExecutedInTransaction()) - { - ereport(ERROR, (errmsg("cannot create or modify type because there was a " - "parallel operation on a distributed table in the " - "transaction"), - errdetail("When creating or altering a type, Citus needs to " - "perform all operations over a single connection per " - "node to ensure consistency."), - errhint("Try re-running the transaction with " - "\"SET LOCAL citus.multi_shard_modify_mode TO " - "\'sequential\';\""))); - } - - ereport(DEBUG1, (errmsg("switching to sequential query execution mode"), - errdetail("Type is created or altered. To make sure subsequent " - "commands see the type correctly we need to make sure to " - "use only one connection for all future commands"))); - SetLocalMultiShardModifyModeToSequential(); -} - - /* * ShouldPropagateTypeCreate returns if we should propagate the creation of a type. * diff --git a/src/backend/distributed/executor/multi_executor.c b/src/backend/distributed/executor/multi_executor.c index a101d2968..cc7f8d3ac 100644 --- a/src/backend/distributed/executor/multi_executor.c +++ b/src/backend/distributed/executor/multi_executor.c @@ -32,6 +32,7 @@ #include "distributed/distributed_planner.h" #include "distributed/multi_router_planner.h" #include "distributed/multi_server_executor.h" +#include "distributed/relation_access_tracking.h" #include "distributed/resource_lock.h" #include "distributed/transaction_management.h" #include "distributed/version_compat.h" @@ -81,6 +82,7 @@ int ExecutorLevel = 0; /* local function forward declarations */ static Relation StubRelation(TupleDesc tupleDescriptor); +static char * GetObjectTypeString(ObjectType objType); static bool AlterTableConstraintCheck(QueryDesc *queryDesc); static List * FindCitusCustomScanStates(PlanState *planState); static bool CitusCustomScanStateWalker(PlanState *planState, @@ -691,6 +693,98 @@ SetLocalMultiShardModifyModeToSequential() } +/* + * EnsureSequentialMode makes sure that the current transaction is already in + * sequential mode, or can still safely be put in sequential mode, it errors if that is + * not possible. The error contains information for the user to retry the transaction with + * sequential mode set from the beginning. + * + * Takes an ObjectType to use in the error/debug messages. + */ +void +EnsureSequentialMode(ObjectType objType) +{ + char *objTypeString = GetObjectTypeString(objType); + + if (ParallelQueryExecutedInTransaction()) + { + ereport(ERROR, (errmsg("cannot run %s command because there was a " + "parallel operation on a distributed table in the " + "transaction", objTypeString), + errdetail("When running command on/for a distributed %s, Citus " + "needs to perform all operations over a single " + "connection per node to ensure consistency.", + objTypeString), + errhint("Try re-running the transaction with " + "\"SET LOCAL citus.multi_shard_modify_mode TO " + "\'sequential\';\""))); + } + + ereport(DEBUG1, (errmsg("switching to sequential query execution mode"), + errdetail( + "A command for a distributed %s is run. To make sure subsequent " + "commands see the %s correctly we need to make sure to " + "use only one connection for all future commands", + objTypeString, objTypeString))); + + SetLocalMultiShardModifyModeToSequential(); +} + + +/* + * GetObjectTypeString takes an ObjectType and returns the string version of it. + * We (for now) call this function only in EnsureSequentialMode, and use the returned + * string to generate error/debug messages. + * + * If GetObjectTypeString gets called with an ObjectType that is not in the switch + * statement, the function will return the string "object", and emit a debug message. + * In that case, make sure you've added the newly supported type to the switch statement. + */ +static char * +GetObjectTypeString(ObjectType objType) +{ + switch (objType) + { + case OBJECT_COLLATION: + { + return "collation"; + } + + case OBJECT_DATABASE: + { + return "database"; + } + + case OBJECT_EXTENSION: + { + return "extension"; + } + + case OBJECT_FUNCTION: + { + return "function"; + } + + case OBJECT_SCHEMA: + { + return "schema"; + } + + case OBJECT_TYPE: + { + return "type"; + } + + default: + { + ereport(DEBUG1, (errmsg("unsupported object type"), + errdetail("Please add string conversion for the object."))); + return "object"; + } + } +} + + /* * AlterTableConstraintCheck returns if the given query is an ALTER TABLE * constraint check query. diff --git a/src/include/distributed/multi_executor.h b/src/include/distributed/multi_executor.h index 3648dbc1b..dd10c511d 100644 --- a/src/include/distributed/multi_executor.h +++ b/src/include/distributed/multi_executor.h @@ -139,6 +139,7 @@ extern void ExecuteQueryIntoDestReceiver(Query *query, ParamListInfo params, extern void ExecutePlanIntoDestReceiver(PlannedStmt *queryPlan, ParamListInfo params, DestReceiver *dest); extern void SetLocalMultiShardModifyModeToSequential(void); +extern void EnsureSequentialMode(ObjectType objType); extern void SetLocalForceMaxQueryParallelization(void); extern void SortTupleStore(CitusScanState *scanState); extern bool DistributedPlanModifiesDatabase(DistributedPlan *plan); diff --git a/src/test/regress/expected/alter_database_owner.out b/src/test/regress/expected/alter_database_owner.out index 2e5e54aca..1fb118242 100644 --- a/src/test/regress/expected/alter_database_owner.out +++ b/src/test/regress/expected/alter_database_owner.out @@ -165,8 +165,8 @@ SELECT count(*) FROM t; -- parallel execution; (1 row) ALTER DATABASE regression OWNER TO database_owner_2; -- should ERROR -ERROR: cannot create or modify database because there was a parallel operation on a distributed table in the transaction -DETAIL: When creating or altering a database, Citus needs to perform all operations over a single connection per node to ensure consistency. +ERROR: cannot run database command because there was a parallel operation on a distributed table in the transaction +DETAIL: When running command on/for a distributed database, Citus needs to perform all operations over a single connection per node to ensure consistency. HINT: Try re-running the transaction with "SET LOCAL citus.multi_shard_modify_mode TO 'sequential';" ROLLBACK; -- list the owners of the current database on all nodes diff --git a/src/test/regress/expected/forcedelegation_functions.out b/src/test/regress/expected/forcedelegation_functions.out index abfcaced8..c4e07d5f9 100644 --- a/src/test/regress/expected/forcedelegation_functions.out +++ b/src/test/regress/expected/forcedelegation_functions.out @@ -304,7 +304,7 @@ END; $$ LANGUAGE plpgsql; SELECT create_distributed_function('func_calls_forcepush_func()'); DEBUG: switching to sequential query execution mode -DETAIL: A distributed function is created. To make sure subsequent commands see the type correctly we need to make sure to use only one connection for all future commands +DETAIL: A command for a distributed function is run. To make sure subsequent commands see the function correctly we need to make sure to use only one connection for all future commands create_distributed_function --------------------------------------------------------------------- @@ -312,7 +312,7 @@ DETAIL: A distributed function is created. To make sure subsequent commands see SELECT create_distributed_function('inner_force_delegation_function(int)', '$1', colocate_with := 'test_nested', force_delegation := true); DEBUG: switching to sequential query execution mode -DETAIL: A distributed function is created. To make sure subsequent commands see the type correctly we need to make sure to use only one connection for all future commands +DETAIL: A command for a distributed function is run. To make sure subsequent commands see the function correctly we need to make sure to use only one connection for all future commands create_distributed_function --------------------------------------------------------------------- @@ -475,7 +475,7 @@ END; $$ LANGUAGE plpgsql; SELECT create_distributed_function('test_recursive(int)', '$1', colocate_with := 'test_nested', force_delegation := true); DEBUG: switching to sequential query execution mode -DETAIL: A distributed function is created. To make sure subsequent commands see the type correctly we need to make sure to use only one connection for all future commands +DETAIL: A command for a distributed function is run. To make sure subsequent commands see the function correctly we need to make sure to use only one connection for all future commands create_distributed_function --------------------------------------------------------------------- @@ -550,7 +550,7 @@ SELECT create_distributed_function( colocate_with := 'test_forcepushdown', force_delegation := true); DEBUG: switching to sequential query execution mode -DETAIL: A distributed function is created. To make sure subsequent commands see the type correctly we need to make sure to use only one connection for all future commands +DETAIL: A command for a distributed function is run. To make sure subsequent commands see the function correctly we need to make sure to use only one connection for all future commands create_distributed_function --------------------------------------------------------------------- @@ -620,7 +620,7 @@ END; $$ LANGUAGE plpgsql; SELECT create_distributed_function('inner_emp(text)','empname', force_delegation := true); DEBUG: switching to sequential query execution mode -DETAIL: A distributed function is created. To make sure subsequent commands see the type correctly we need to make sure to use only one connection for all future commands +DETAIL: A command for a distributed function is run. To make sure subsequent commands see the function correctly we need to make sure to use only one connection for all future commands create_distributed_function --------------------------------------------------------------------- @@ -656,7 +656,7 @@ SELECT create_distributed_function( force_delegation := true ); DEBUG: switching to sequential query execution mode -DETAIL: A distributed function is created. To make sure subsequent commands see the type correctly we need to make sure to use only one connection for all future commands +DETAIL: A command for a distributed function is run. To make sure subsequent commands see the function correctly we need to make sure to use only one connection for all future commands create_distributed_function --------------------------------------------------------------------- @@ -731,7 +731,7 @@ SELECT create_distributed_function( force_delegation := true ); DEBUG: switching to sequential query execution mode -DETAIL: A distributed function is created. To make sure subsequent commands see the type correctly we need to make sure to use only one connection for all future commands +DETAIL: A command for a distributed function is run. To make sure subsequent commands see the function correctly we need to make sure to use only one connection for all future commands create_distributed_function --------------------------------------------------------------------- @@ -809,7 +809,7 @@ SELECT create_distributed_function( force_delegation := true ); DEBUG: switching to sequential query execution mode -DETAIL: A distributed function is created. To make sure subsequent commands see the type correctly we need to make sure to use only one connection for all future commands +DETAIL: A command for a distributed function is run. To make sure subsequent commands see the function correctly we need to make sure to use only one connection for all future commands create_distributed_function --------------------------------------------------------------------- @@ -827,7 +827,7 @@ SELECT create_distributed_function( force_delegation := true ); DEBUG: switching to sequential query execution mode -DETAIL: A distributed function is created. To make sure subsequent commands see the type correctly we need to make sure to use only one connection for all future commands +DETAIL: A command for a distributed function is run. To make sure subsequent commands see the function correctly we need to make sure to use only one connection for all future commands create_distributed_function --------------------------------------------------------------------- @@ -845,7 +845,7 @@ SELECT create_distributed_function( force_delegation := true ); DEBUG: switching to sequential query execution mode -DETAIL: A distributed function is created. To make sure subsequent commands see the type correctly we need to make sure to use only one connection for all future commands +DETAIL: A command for a distributed function is run. To make sure subsequent commands see the function correctly we need to make sure to use only one connection for all future commands create_distributed_function --------------------------------------------------------------------- @@ -953,7 +953,7 @@ SELECT create_distributed_function( force_delegation := true ); DEBUG: switching to sequential query execution mode -DETAIL: A distributed function is created. To make sure subsequent commands see the type correctly we need to make sure to use only one connection for all future commands +DETAIL: A command for a distributed function is run. To make sure subsequent commands see the function correctly we need to make sure to use only one connection for all future commands create_distributed_function --------------------------------------------------------------------- @@ -975,7 +975,7 @@ SELECT create_distributed_function( force_delegation := true ); DEBUG: switching to sequential query execution mode -DETAIL: A distributed function is created. To make sure subsequent commands see the type correctly we need to make sure to use only one connection for all future commands +DETAIL: A command for a distributed function is run. To make sure subsequent commands see the function correctly we need to make sure to use only one connection for all future commands create_distributed_function --------------------------------------------------------------------- @@ -996,7 +996,7 @@ SELECT create_distributed_function( force_delegation := true ); DEBUG: switching to sequential query execution mode -DETAIL: A distributed function is created. To make sure subsequent commands see the type correctly we need to make sure to use only one connection for all future commands +DETAIL: A command for a distributed function is run. To make sure subsequent commands see the function correctly we need to make sure to use only one connection for all future commands create_distributed_function --------------------------------------------------------------------- @@ -1017,7 +1017,7 @@ SELECT create_distributed_function( force_delegation := true ); DEBUG: switching to sequential query execution mode -DETAIL: A distributed function is created. To make sure subsequent commands see the type correctly we need to make sure to use only one connection for all future commands +DETAIL: A command for a distributed function is run. To make sure subsequent commands see the function correctly we need to make sure to use only one connection for all future commands create_distributed_function --------------------------------------------------------------------- @@ -1039,7 +1039,7 @@ SELECT create_distributed_function( force_delegation := true ); DEBUG: switching to sequential query execution mode -DETAIL: A distributed function is created. To make sure subsequent commands see the type correctly we need to make sure to use only one connection for all future commands +DETAIL: A command for a distributed function is run. To make sure subsequent commands see the function correctly we need to make sure to use only one connection for all future commands create_distributed_function --------------------------------------------------------------------- @@ -1205,7 +1205,7 @@ END; $$ LANGUAGE plpgsql; SELECT create_distributed_function('test_prepare(int,int)','x',force_delegation :=true, colocate_with := 'table_test_prepare'); DEBUG: switching to sequential query execution mode -DETAIL: A distributed function is created. To make sure subsequent commands see the type correctly we need to make sure to use only one connection for all future commands +DETAIL: A command for a distributed function is run. To make sure subsequent commands see the function correctly we need to make sure to use only one connection for all future commands create_distributed_function --------------------------------------------------------------------- @@ -1400,7 +1400,7 @@ $$ LANGUAGE plpgsql; SELECT create_distributed_function('test(int)', 'x', colocate_with := 'test_perform', force_delegation := true); DEBUG: switching to sequential query execution mode -DETAIL: A distributed function is created. To make sure subsequent commands see the type correctly we need to make sure to use only one connection for all future commands +DETAIL: A command for a distributed function is run. To make sure subsequent commands see the function correctly we need to make sure to use only one connection for all future commands create_distributed_function --------------------------------------------------------------------- diff --git a/src/test/regress/expected/local_dist_join_mixed.out b/src/test/regress/expected/local_dist_join_mixed.out index 0e3e0aad4..5566186b5 100644 --- a/src/test/regress/expected/local_dist_join_mixed.out +++ b/src/test/regress/expected/local_dist_join_mixed.out @@ -1602,6 +1602,8 @@ DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS c (1 row) DROP SCHEMA local_dist_join_mixed CASCADE; +DEBUG: switching to sequential query execution mode +DETAIL: A command for a distributed schema is run. To make sure subsequent commands see the schema correctly we need to make sure to use only one connection for all future commands NOTICE: drop cascades to 7 other objects DETAIL: drop cascades to table distributed drop cascades to table reference diff --git a/src/test/regress/expected/multi_function_in_join.out b/src/test/regress/expected/multi_function_in_join.out index 5d1b2a672..c80b26c57 100644 --- a/src/test/regress/expected/multi_function_in_join.out +++ b/src/test/regress/expected/multi_function_in_join.out @@ -41,7 +41,7 @@ AS 'SELECT $1 + $2;' LANGUAGE SQL; SELECT create_distributed_function('add(integer,integer)'); DEBUG: switching to sequential query execution mode -DETAIL: A distributed function is created. To make sure subsequent commands see the type correctly we need to make sure to use only one connection for all future commands +DETAIL: A command for a distributed function is run. To make sure subsequent commands see the function correctly we need to make sure to use only one connection for all future commands create_distributed_function --------------------------------------------------------------------- diff --git a/src/test/regress/expected/multi_function_in_join_0.out b/src/test/regress/expected/multi_function_in_join_0.out index 21279ab8d..5b818855b 100644 --- a/src/test/regress/expected/multi_function_in_join_0.out +++ b/src/test/regress/expected/multi_function_in_join_0.out @@ -41,7 +41,7 @@ AS 'SELECT $1 + $2;' LANGUAGE SQL; SELECT create_distributed_function('add(integer,integer)'); DEBUG: switching to sequential query execution mode -DETAIL: A distributed function is created. To make sure subsequent commands see the type correctly we need to make sure to use only one connection for all future commands +DETAIL: A command for a distributed function is run. To make sure subsequent commands see the function correctly we need to make sure to use only one connection for all future commands create_distributed_function --------------------------------------------------------------------- diff --git a/src/test/regress/expected/multi_mx_add_coordinator.out b/src/test/regress/expected/multi_mx_add_coordinator.out index 33aec18d8..045563d6f 100644 --- a/src/test/regress/expected/multi_mx_add_coordinator.out +++ b/src/test/regress/expected/multi_mx_add_coordinator.out @@ -141,7 +141,7 @@ END; $$; SELECT create_distributed_function('my_group_id()', colocate_with := 'ref'); DEBUG: switching to sequential query execution mode -DETAIL: A distributed function is created. To make sure subsequent commands see the type correctly we need to make sure to use only one connection for all future commands +DETAIL: A command for a distributed function is run. To make sure subsequent commands see the function correctly we need to make sure to use only one connection for all future commands create_distributed_function --------------------------------------------------------------------- diff --git a/src/test/regress/expected/multi_mx_call.out b/src/test/regress/expected/multi_mx_call.out index 64b033d41..37c989885 100644 --- a/src/test/regress/expected/multi_mx_call.out +++ b/src/test/regress/expected/multi_mx_call.out @@ -416,7 +416,7 @@ CALL multi_mx_call.mx_call_proc_tx(10); -- after distribution ... select create_distributed_function('mx_call_proc_tx(int)', '$1', 'mx_call_dist_table_1'); DEBUG: switching to sequential query execution mode -DETAIL: A distributed function is created. To make sure subsequent commands see the type correctly we need to make sure to use only one connection for all future commands +DETAIL: A command for a distributed function is run. To make sure subsequent commands see the function correctly we need to make sure to use only one connection for all future commands create_distributed_function --------------------------------------------------------------------- @@ -488,7 +488,7 @@ BEGIN END;$$; select create_distributed_function('mx_call_proc_raise(int)', '$1', 'mx_call_dist_table_1'); DEBUG: switching to sequential query execution mode -DETAIL: A distributed function is created. To make sure subsequent commands see the type correctly we need to make sure to use only one connection for all future commands +DETAIL: A command for a distributed function is run. To make sure subsequent commands see the function correctly we need to make sure to use only one connection for all future commands create_distributed_function --------------------------------------------------------------------- @@ -554,7 +554,7 @@ CREATE FUNCTION mx_call_add(int, int) RETURNS int AS 'select $1 + $2;' LANGUAGE SQL IMMUTABLE; SELECT create_distributed_function('mx_call_add(int,int)'); DEBUG: switching to sequential query execution mode -DETAIL: A distributed function is created. To make sure subsequent commands see the type correctly we need to make sure to use only one connection for all future commands +DETAIL: A command for a distributed function is run. To make sure subsequent commands see the function correctly we need to make sure to use only one connection for all future commands create_distributed_function --------------------------------------------------------------------- diff --git a/src/test/regress/expected/multi_mx_call_0.out b/src/test/regress/expected/multi_mx_call_0.out index 496e735c9..6f33d4dae 100644 --- a/src/test/regress/expected/multi_mx_call_0.out +++ b/src/test/regress/expected/multi_mx_call_0.out @@ -416,7 +416,7 @@ CALL multi_mx_call.mx_call_proc_tx(10); -- after distribution ... select create_distributed_function('mx_call_proc_tx(int)', '$1', 'mx_call_dist_table_1'); DEBUG: switching to sequential query execution mode -DETAIL: A distributed function is created. To make sure subsequent commands see the type correctly we need to make sure to use only one connection for all future commands +DETAIL: A command for a distributed function is run. To make sure subsequent commands see the function correctly we need to make sure to use only one connection for all future commands create_distributed_function --------------------------------------------------------------------- @@ -488,7 +488,7 @@ BEGIN END;$$; select create_distributed_function('mx_call_proc_raise(int)', '$1', 'mx_call_dist_table_1'); DEBUG: switching to sequential query execution mode -DETAIL: A distributed function is created. To make sure subsequent commands see the type correctly we need to make sure to use only one connection for all future commands +DETAIL: A command for a distributed function is run. To make sure subsequent commands see the function correctly we need to make sure to use only one connection for all future commands create_distributed_function --------------------------------------------------------------------- @@ -554,7 +554,7 @@ CREATE FUNCTION mx_call_add(int, int) RETURNS int AS 'select $1 + $2;' LANGUAGE SQL IMMUTABLE; SELECT create_distributed_function('mx_call_add(int,int)'); DEBUG: switching to sequential query execution mode -DETAIL: A distributed function is created. To make sure subsequent commands see the type correctly we need to make sure to use only one connection for all future commands +DETAIL: A command for a distributed function is run. To make sure subsequent commands see the function correctly we need to make sure to use only one connection for all future commands create_distributed_function --------------------------------------------------------------------- diff --git a/src/test/regress/expected/multi_mx_function_call_delegation.out b/src/test/regress/expected/multi_mx_function_call_delegation.out index 8192ee35c..954c23420 100644 --- a/src/test/regress/expected/multi_mx_function_call_delegation.out +++ b/src/test/regress/expected/multi_mx_function_call_delegation.out @@ -213,7 +213,7 @@ select colocate_proc_with_table('squares', 'mx_call_dist_table_2'::regclass, 0); select create_distributed_function('mx_call_func_bigint(bigint,bigint)', 'x', colocate_with := 'mx_call_dist_table_bigint'); DEBUG: switching to sequential query execution mode -DETAIL: A distributed function is created. To make sure subsequent commands see the type correctly we need to make sure to use only one connection for all future commands +DETAIL: A command for a distributed function is run. To make sure subsequent commands see the function correctly we need to make sure to use only one connection for all future commands create_distributed_function --------------------------------------------------------------------- @@ -224,7 +224,7 @@ select create_distributed_function('mx_call_func_bigint_force(bigint,bigint)', ' colocate_with := 'mx_call_dist_table_2', force_delegation := true); DEBUG: switching to sequential query execution mode -DETAIL: A distributed function is created. To make sure subsequent commands see the type correctly we need to make sure to use only one connection for all future commands +DETAIL: A command for a distributed function is run. To make sure subsequent commands see the function correctly we need to make sure to use only one connection for all future commands create_distributed_function --------------------------------------------------------------------- @@ -420,7 +420,7 @@ select mx_call_func_tbl(10); -- after distribution ... select create_distributed_function('mx_call_func_tbl(int)', '$1', 'mx_call_dist_table_1'); DEBUG: switching to sequential query execution mode -DETAIL: A distributed function is created. To make sure subsequent commands see the type correctly we need to make sure to use only one connection for all future commands +DETAIL: A command for a distributed function is run. To make sure subsequent commands see the function correctly we need to make sure to use only one connection for all future commands create_distributed_function --------------------------------------------------------------------- @@ -443,7 +443,7 @@ BEGIN END;$$; select create_distributed_function('mx_call_func_raise(int)', '$1', 'mx_call_dist_table_1'); DEBUG: switching to sequential query execution mode -DETAIL: A distributed function is created. To make sure subsequent commands see the type correctly we need to make sure to use only one connection for all future commands +DETAIL: A command for a distributed function is run. To make sure subsequent commands see the function correctly we need to make sure to use only one connection for all future commands create_distributed_function --------------------------------------------------------------------- @@ -595,7 +595,7 @@ CREATE FUNCTION mx_call_add(int, int) RETURNS int AS 'select $1 + $2;' LANGUAGE SQL IMMUTABLE; SELECT create_distributed_function('mx_call_add(int,int)', '$1'); DEBUG: switching to sequential query execution mode -DETAIL: A distributed function is created. To make sure subsequent commands see the type correctly we need to make sure to use only one connection for all future commands +DETAIL: A command for a distributed function is run. To make sure subsequent commands see the function correctly we need to make sure to use only one connection for all future commands create_distributed_function --------------------------------------------------------------------- diff --git a/src/test/regress/expected/multi_mx_function_call_delegation_0.out b/src/test/regress/expected/multi_mx_function_call_delegation_0.out index 9125847b4..66c0f89d2 100644 --- a/src/test/regress/expected/multi_mx_function_call_delegation_0.out +++ b/src/test/regress/expected/multi_mx_function_call_delegation_0.out @@ -213,7 +213,7 @@ select colocate_proc_with_table('squares', 'mx_call_dist_table_2'::regclass, 0); select create_distributed_function('mx_call_func_bigint(bigint,bigint)', 'x', colocate_with := 'mx_call_dist_table_bigint'); DEBUG: switching to sequential query execution mode -DETAIL: A distributed function is created. To make sure subsequent commands see the type correctly we need to make sure to use only one connection for all future commands +DETAIL: A command for a distributed function is run. To make sure subsequent commands see the function correctly we need to make sure to use only one connection for all future commands create_distributed_function --------------------------------------------------------------------- @@ -224,7 +224,7 @@ select create_distributed_function('mx_call_func_bigint_force(bigint,bigint)', ' colocate_with := 'mx_call_dist_table_2', force_delegation := true); DEBUG: switching to sequential query execution mode -DETAIL: A distributed function is created. To make sure subsequent commands see the type correctly we need to make sure to use only one connection for all future commands +DETAIL: A command for a distributed function is run. To make sure subsequent commands see the function correctly we need to make sure to use only one connection for all future commands create_distributed_function --------------------------------------------------------------------- @@ -420,7 +420,7 @@ select mx_call_func_tbl(10); -- after distribution ... select create_distributed_function('mx_call_func_tbl(int)', '$1', 'mx_call_dist_table_1'); DEBUG: switching to sequential query execution mode -DETAIL: A distributed function is created. To make sure subsequent commands see the type correctly we need to make sure to use only one connection for all future commands +DETAIL: A command for a distributed function is run. To make sure subsequent commands see the function correctly we need to make sure to use only one connection for all future commands create_distributed_function --------------------------------------------------------------------- @@ -443,7 +443,7 @@ BEGIN END;$$; select create_distributed_function('mx_call_func_raise(int)', '$1', 'mx_call_dist_table_1'); DEBUG: switching to sequential query execution mode -DETAIL: A distributed function is created. To make sure subsequent commands see the type correctly we need to make sure to use only one connection for all future commands +DETAIL: A command for a distributed function is run. To make sure subsequent commands see the function correctly we need to make sure to use only one connection for all future commands create_distributed_function --------------------------------------------------------------------- @@ -595,7 +595,7 @@ CREATE FUNCTION mx_call_add(int, int) RETURNS int AS 'select $1 + $2;' LANGUAGE SQL IMMUTABLE; SELECT create_distributed_function('mx_call_add(int,int)', '$1'); DEBUG: switching to sequential query execution mode -DETAIL: A distributed function is created. To make sure subsequent commands see the type correctly we need to make sure to use only one connection for all future commands +DETAIL: A command for a distributed function is run. To make sure subsequent commands see the function correctly we need to make sure to use only one connection for all future commands create_distributed_function --------------------------------------------------------------------- diff --git a/src/test/regress/expected/multi_schema_support.out b/src/test/regress/expected/multi_schema_support.out index ee4879edf..6e3ac0dc8 100644 --- a/src/test/regress/expected/multi_schema_support.out +++ b/src/test/regress/expected/multi_schema_support.out @@ -1428,8 +1428,8 @@ BEGIN; (1 row) ALTER SCHEMA bar RENAME TO foo; -ERROR: cannot create or modify schema because there was a parallel operation on a distributed table in the transaction -DETAIL: When creating, altering, or dropping a schema, Citus needs to perform all operations over a single connection per node to ensure consistency. +ERROR: cannot run schema command because there was a parallel operation on a distributed table in the transaction +DETAIL: When running command on/for a distributed schema, Citus needs to perform all operations over a single connection per node to ensure consistency. HINT: Try re-running the transaction with "SET LOCAL citus.multi_shard_modify_mode TO 'sequential';" ROLLBACK; BEGIN; diff --git a/src/test/regress/expected/non_colocated_subquery_joins.out b/src/test/regress/expected/non_colocated_subquery_joins.out index 4fa460f4a..0776f47cf 100644 --- a/src/test/regress/expected/non_colocated_subquery_joins.out +++ b/src/test/regress/expected/non_colocated_subquery_joins.out @@ -8,6 +8,8 @@ -- =================================================================== SET client_min_messages TO DEBUG1; CREATE SCHEMA non_colocated_subquery; +DEBUG: switching to sequential query execution mode +DETAIL: A command for a distributed schema is run. To make sure subsequent commands see the schema correctly we need to make sure to use only one connection for all future commands SET search_path TO non_colocated_subquery, public; -- we don't use the data anyway CREATE TABLE users_table_local AS SELECT * FROM users_table LIMIT 0;