diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 7777554b2..ae8d8c625 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -32,6 +32,8 @@ why we ask this as well as instructions for how to proceed, see the ./configure make make install + # Optionally, you might instead want to use `make install-all` + # since `multi_extension` regression test would fail due to missing downgrade scripts. cd src/test/regress make check ``` @@ -51,7 +53,7 @@ why we ask this as well as instructions for how to proceed, see the autoconf flex git libcurl4-gnutls-dev libicu-dev \ libkrb5-dev liblz4-dev libpam0g-dev libreadline-dev \ libselinux1-dev libssl-dev libxslt1-dev libzstd-dev \ - make uuid-dev + make uuid-dev mitmproxy ``` 2. Get, build, and test the code @@ -62,6 +64,8 @@ why we ask this as well as instructions for how to proceed, see the ./configure make sudo make install + # Optionally, you might instead want to use `sudo make install-all` + # since `multi_extension` regression test would fail due to missing downgrade scripts. cd src/test/regress make check ``` @@ -104,6 +108,8 @@ why we ask this as well as instructions for how to proceed, see the PG_CONFIG=/usr/pgsql-14/bin/pg_config ./configure make sudo make install + # Optionally, you might instead want to use `sudo make install-all` + # since `multi_extension` regression test would fail due to missing downgrade scripts. cd src/test/regress make check ``` @@ -125,6 +131,8 @@ cd build cmake .. make -j5 sudo make install +# Optionally, you might instead want to use `sudo make install-all` +# since `multi_extension` regression test would fail due to missing downgrade scripts. cd ../.. git clone https://github.com/citusdata/tools.git diff --git a/src/backend/distributed/commands/collation.c b/src/backend/distributed/commands/collation.c index ddbe63f52..d712c18ab 100644 --- a/src/backend/distributed/commands/collation.c +++ b/src/backend/distributed/commands/collation.c @@ -37,7 +37,6 @@ static char * CreateCollationDDLInternal(Oid collationId, Oid *collowner, char **quotedCollationName); static List * FilterNameListForDistributedCollations(List *objects, bool missing_ok, List **addresses); -static void EnsureSequentialModeForCollationDDL(void); /* @@ -256,7 +255,7 @@ PreprocessDropCollationStmt(Node *node, const char *queryString, char *dropStmtSql = DeparseTreeNode((Node *) stmt); stmt->objects = oldCollations; - EnsureSequentialModeForCollationDDL(); + EnsureSequentialMode(OBJECT_COLLATION); /* to prevent recursion with mx we disable ddl propagation */ List *commands = list_make3(DISABLE_DDL_PROPAGATION, @@ -292,7 +291,7 @@ PreprocessAlterCollationOwnerStmt(Node *node, const char *queryString, QualifyTreeNode((Node *) stmt); char *sql = DeparseTreeNode((Node *) stmt); - EnsureSequentialModeForCollationDDL(); + EnsureSequentialMode(OBJECT_COLLATION); List *commands = list_make3(DISABLE_DDL_PROPAGATION, (void *) sql, ENABLE_DDL_PROPAGATION); @@ -328,7 +327,7 @@ PreprocessRenameCollationStmt(Node *node, const char *queryString, /* deparse sql*/ char *renameStmtSql = DeparseTreeNode((Node *) stmt); - EnsureSequentialModeForCollationDDL(); + EnsureSequentialMode(OBJECT_COLLATION); /* to prevent recursion with mx we disable ddl propagation */ List *commands = list_make3(DISABLE_DDL_PROPAGATION, @@ -363,7 +362,7 @@ PreprocessAlterCollationSchemaStmt(Node *node, const char *queryString, QualifyTreeNode((Node *) stmt); char *sql = DeparseTreeNode((Node *) stmt); - EnsureSequentialModeForCollationDDL(); + EnsureSequentialMode(OBJECT_COLLATION); List *commands = list_make3(DISABLE_DDL_PROPAGATION, (void *) sql, @@ -453,47 +452,6 @@ AlterCollationSchemaStmtObjectAddress(Node *node, bool missing_ok) } -/* - * EnsureSequentialModeForCollationDDL makes sure that the current transaction is already in - * sequential mode, or can still safely be put in sequential mode, it errors if that is - * not possible. The error contains information for the user to retry the transaction with - * sequential mode set from the beginning. - * - * As collations are node scoped objects there exists only 1 instance of the collation used by - * potentially multiple shards. To make sure all shards in the transaction can interact - * with the type the type needs to be visible on all connections used by the transaction, - * meaning we can only use 1 connection per node. - */ -static void -EnsureSequentialModeForCollationDDL(void) -{ - if (!IsTransactionBlock()) - { - /* we do not need to switch to sequential mode if we are not in a transaction */ - return; - } - - if (ParallelQueryExecutedInTransaction()) - { - ereport(ERROR, (errmsg("cannot create or modify collation because there was a " - "parallel operation on a distributed table in the " - "transaction"), - errdetail("When creating or altering a collation, Citus needs to " - "perform all operations over a single connection per " - "node to ensure consistency."), - errhint("Try re-running the transaction with " - "\"SET LOCAL citus.multi_shard_modify_mode TO " - "\'sequential\';\""))); - } - - ereport(DEBUG1, (errmsg("switching to sequential query execution mode"), - errdetail("Collation is created or altered. To make sure subsequent " - "commands see the collation correctly we need to make sure to " - "use only one connection for all future commands"))); - SetLocalMultiShardModifyModeToSequential(); -} - - /* * GenerateBackupNameForCollationCollision generates a new collation name for an existing collation. * The name is generated in such a way that the new name doesn't overlap with an existing collation diff --git a/src/backend/distributed/commands/create_distributed_table.c b/src/backend/distributed/commands/create_distributed_table.c index 43bd1ebd6..eb51195bf 100644 --- a/src/backend/distributed/commands/create_distributed_table.c +++ b/src/backend/distributed/commands/create_distributed_table.c @@ -60,6 +60,7 @@ #include "distributed/remote_commands.h" #include "distributed/shared_library_init.h" #include "distributed/worker_protocol.h" +#include "distributed/worker_shard_visibility.h" #include "distributed/worker_transaction.h" #include "distributed/version_compat.h" #include "executor/executor.h" @@ -327,6 +328,7 @@ create_reference_table(PG_FUNCTION_ARGS) * - we are on the coordinator * - the current user is the owner of the table * - relation kind is supported + * - relation is not a shard */ static void EnsureCitusTableCanBeCreated(Oid relationOid) @@ -343,6 +345,14 @@ EnsureCitusTableCanBeCreated(Oid relationOid) * will be performed in CreateDistributedTable. */ EnsureRelationKindSupported(relationOid); + + /* + * When coordinator is added to the metadata, or on the workers, + * some of the relations of the coordinator node may/will be shards. + * We disallow creating distributed tables from shard relations, by + * erroring out here. + */ + ErrorIfRelationIsAKnownShard(relationOid); } diff --git a/src/backend/distributed/commands/database.c b/src/backend/distributed/commands/database.c index b4fb15110..59902b038 100644 --- a/src/backend/distributed/commands/database.c +++ b/src/backend/distributed/commands/database.c @@ -29,7 +29,6 @@ #include "distributed/relation_access_tracking.h" #include "distributed/worker_transaction.h" -static void EnsureSequentialModeForDatabaseDDL(void); static AlterOwnerStmt * RecreateAlterDatabaseOwnerStmt(Oid databaseOid); static Oid get_database_owner(Oid db_oid); @@ -66,7 +65,7 @@ PreprocessAlterDatabaseOwnerStmt(Node *node, const char *queryString, QualifyTreeNode((Node *) stmt); const char *sql = DeparseTreeNode((Node *) stmt); - EnsureSequentialModeForDatabaseDDL(); + EnsureSequentialMode(OBJECT_DATABASE); List *commands = list_make3(DISABLE_DDL_PROPAGATION, (void *) sql, ENABLE_DDL_PROPAGATION); @@ -177,39 +176,3 @@ get_database_owner(Oid db_oid) return dba; } - - -/* - * EnsureSequentialModeForDatabaseDDL makes sure that the current transaction is already - * in sequential mode, or can still safely be put in sequential mode, it errors if that is - * not possible. The error contains information for the user to retry the transaction with - * sequential mode set from the beginning. - */ -static void -EnsureSequentialModeForDatabaseDDL(void) -{ - if (!IsTransactionBlock()) - { - /* we do not need to switch to sequential mode if we are not in a transaction */ - return; - } - - if (ParallelQueryExecutedInTransaction()) - { - ereport(ERROR, (errmsg("cannot create or modify database because there was a " - "parallel operation on a distributed table in the " - "transaction"), - errdetail("When creating or altering a database, Citus needs to " - "perform all operations over a single connection per " - "node to ensure consistency."), - errhint("Try re-running the transaction with " - "\"SET LOCAL citus.multi_shard_modify_mode TO " - "\'sequential\';\""))); - } - - ereport(DEBUG1, (errmsg("switching to sequential query execution mode"), - errdetail("Database is created or altered. To make sure subsequent " - "commands see the type correctly we need to make sure to " - "use only one connection for all future commands"))); - SetLocalMultiShardModifyModeToSequential(); -} diff --git a/src/backend/distributed/commands/extension.c b/src/backend/distributed/commands/extension.c index ef5b6c1b1..dc1363a65 100644 --- a/src/backend/distributed/commands/extension.c +++ b/src/backend/distributed/commands/extension.c @@ -37,7 +37,6 @@ static void AddSchemaFieldIfMissing(CreateExtensionStmt *stmt); static List * FilterDistributedExtensions(List *extensionObjectList); static List * ExtensionNameListToObjectAddressList(List *extensionObjectList); static void MarkExistingObjectDependenciesDistributedIfSupported(void); -static void EnsureSequentialModeForExtensionDDL(void); static bool ShouldPropagateExtensionCommand(Node *parseTree); static bool IsAlterExtensionSetSchemaCitus(Node *parseTree); static Node * RecreateExtensionStmt(Oid extensionOid); @@ -163,7 +162,7 @@ PostprocessCreateExtensionStmt(Node *node, const char *queryString) * Make sure that the current transaction is already in sequential mode, * or can still safely be put in sequential mode */ - EnsureSequentialModeForExtensionDDL(); + EnsureSequentialMode(OBJECT_EXTENSION); /* * Here we append "schema" field to the "options" list (if not specified) @@ -274,7 +273,7 @@ PreprocessDropExtensionStmt(Node *node, const char *queryString, * Make sure that the current transaction is already in sequential mode, * or can still safely be put in sequential mode */ - EnsureSequentialModeForExtensionDDL(); + EnsureSequentialMode(OBJECT_EXTENSION); List *distributedExtensionAddresses = ExtensionNameListToObjectAddressList( distributedExtensions); @@ -409,7 +408,7 @@ PreprocessAlterExtensionSchemaStmt(Node *node, const char *queryString, * Make sure that the current transaction is already in sequential mode, * or can still safely be put in sequential mode */ - EnsureSequentialModeForExtensionDDL(); + EnsureSequentialMode(OBJECT_EXTENSION); const char *alterExtensionStmtSql = DeparseTreeNode(node); @@ -478,7 +477,7 @@ PreprocessAlterExtensionUpdateStmt(Node *node, const char *queryString, * Make sure that the current transaction is already in sequential mode, * or can still safely be put in sequential mode */ - EnsureSequentialModeForExtensionDDL(); + EnsureSequentialMode(OBJECT_EXTENSION); const char *alterExtensionStmtSql = DeparseTreeNode((Node *) alterExtensionStmt); @@ -603,44 +602,6 @@ PreprocessAlterExtensionContentsStmt(Node *node, const char *queryString, } -/* - * EnsureSequentialModeForExtensionDDL makes sure that the current transaction is already in - * sequential mode, or can still safely be put in sequential mode, it errors if that is - * not possible. The error contains information for the user to retry the transaction with - * sequential mode set from the beginning. - * - * As extensions are node scoped objects there exists only 1 instance of the - * extension used by potentially multiple shards. To make sure all shards in - * the transaction can interact with the extension the extension needs to be - * visible on all connections used by the transaction, meaning we can only use - * 1 connection per node. - */ -static void -EnsureSequentialModeForExtensionDDL(void) -{ - if (ParallelQueryExecutedInTransaction()) - { - ereport(ERROR, (errmsg("cannot run extension command because there was a " - "parallel operation on a distributed table in the " - "transaction"), - errdetail( - "When running command on/for a distributed extension, Citus needs to " - "perform all operations over a single connection per " - "node to ensure consistency."), - errhint("Try re-running the transaction with " - "\"SET LOCAL citus.multi_shard_modify_mode TO " - "\'sequential\';\""))); - } - - ereport(DEBUG1, (errmsg("switching to sequential query execution mode"), - errdetail( - "A command for a distributed extension is run. To make sure subsequent " - "commands see the type correctly we need to make sure to " - "use only one connection for all future commands"))); - SetLocalMultiShardModifyModeToSequential(); -} - - /* * ShouldPropagateExtensionCommand determines whether to propagate an extension * command to the worker nodes. diff --git a/src/backend/distributed/commands/function.c b/src/backend/distributed/commands/function.c index 59272e472..42b3fc7e0 100644 --- a/src/backend/distributed/commands/function.c +++ b/src/backend/distributed/commands/function.c @@ -79,7 +79,6 @@ static int GetFunctionColocationId(Oid functionOid, char *colocateWithName, Oid static void EnsureFunctionCanBeColocatedWithTable(Oid functionOid, Oid distributionColumnType, Oid sourceRelationId); -static void EnsureSequentialModeForFunctionDDL(void); static bool ShouldPropagateCreateFunction(CreateFunctionStmt *stmt); static bool ShouldPropagateAlterFunction(const ObjectAddress *address); static bool ShouldAddFunctionSignature(FunctionParameterMode mode); @@ -211,7 +210,7 @@ create_distributed_function(PG_FUNCTION_ARGS) * when we allow propagation within a transaction block we should make sure * to only allow this in sequential mode. */ - EnsureSequentialModeForFunctionDDL(); + EnsureSequentialMode(OBJECT_FUNCTION); EnsureDependenciesExistOnAllNodes(&functionAddress); @@ -1173,48 +1172,6 @@ GetAggregateDDLCommand(const RegProcedure funcOid, bool useCreateOrReplace) } -/* - * EnsureSequentialModeForFunctionDDL makes sure that the current transaction is already in - * sequential mode, or can still safely be put in sequential mode, it errors if that is - * not possible. The error contains information for the user to retry the transaction with - * sequential mode set from the beginning. - * - * As functions are node scoped objects there exists only 1 instance of the function used by - * potentially multiple shards. To make sure all shards in the transaction can interact - * with the function the function needs to be visible on all connections used by the transaction, - * meaning we can only use 1 connection per node. - */ -static void -EnsureSequentialModeForFunctionDDL(void) -{ - if (!IsTransactionBlock()) - { - /* we do not need to switch to sequential mode if we are not in a transaction */ - return; - } - - if (ParallelQueryExecutedInTransaction()) - { - ereport(ERROR, (errmsg("cannot create function because there was a " - "parallel operation on a distributed table in the " - "transaction"), - errdetail("When creating a distributed function, Citus needs to " - "perform all operations over a single connection per " - "node to ensure consistency."), - errhint("Try re-running the transaction with " - "\"SET LOCAL citus.multi_shard_modify_mode TO " - "\'sequential\';\""))); - } - - ereport(DEBUG1, (errmsg("switching to sequential query execution mode"), - errdetail( - "A distributed function is created. To make sure subsequent " - "commands see the type correctly we need to make sure to " - "use only one connection for all future commands"))); - SetLocalMultiShardModifyModeToSequential(); -} - - /* * ShouldPropagateCreateFunction tests if we need to propagate a CREATE FUNCTION * statement. @@ -1298,7 +1255,7 @@ PreprocessCreateFunctionStmt(Node *node, const char *queryString, EnsureCoordinator(); - EnsureSequentialModeForFunctionDDL(); + EnsureSequentialMode(OBJECT_FUNCTION); /* * ddl jobs will be generated during the postprocessing phase as we need the function to @@ -1483,7 +1440,7 @@ PreprocessAlterFunctionStmt(Node *node, const char *queryString, EnsureCoordinator(); ErrorIfUnsupportedAlterFunctionStmt(stmt); - EnsureSequentialModeForFunctionDDL(); + EnsureSequentialMode(OBJECT_FUNCTION); QualifyTreeNode((Node *) stmt); const char *sql = DeparseTreeNode((Node *) stmt); @@ -1517,7 +1474,7 @@ PreprocessRenameFunctionStmt(Node *node, const char *queryString, } EnsureCoordinator(); - EnsureSequentialModeForFunctionDDL(); + EnsureSequentialMode(OBJECT_FUNCTION); QualifyTreeNode((Node *) stmt); const char *sql = DeparseTreeNode((Node *) stmt); @@ -1549,7 +1506,7 @@ PreprocessAlterFunctionSchemaStmt(Node *node, const char *queryString, } EnsureCoordinator(); - EnsureSequentialModeForFunctionDDL(); + EnsureSequentialMode(OBJECT_FUNCTION); QualifyTreeNode((Node *) stmt); const char *sql = DeparseTreeNode((Node *) stmt); @@ -1582,7 +1539,7 @@ PreprocessAlterFunctionOwnerStmt(Node *node, const char *queryString, } EnsureCoordinator(); - EnsureSequentialModeForFunctionDDL(); + EnsureSequentialMode(OBJECT_FUNCTION); QualifyTreeNode((Node *) stmt); const char *sql = DeparseTreeNode((Node *) stmt); @@ -1672,7 +1629,7 @@ PreprocessDropFunctionStmt(Node *node, const char *queryString, * types, so we block the call. */ EnsureCoordinator(); - EnsureSequentialModeForFunctionDDL(); + EnsureSequentialMode(OBJECT_FUNCTION); /* remove the entries for the distributed objects on dropping */ ObjectAddress *address = NULL; diff --git a/src/backend/distributed/commands/schema.c b/src/backend/distributed/commands/schema.c index adb26b5e7..e0c0f2b11 100644 --- a/src/backend/distributed/commands/schema.c +++ b/src/backend/distributed/commands/schema.c @@ -42,7 +42,6 @@ static ObjectAddress GetObjectAddressBySchemaName(char *schemaName, bool missing_ok); static List * FilterDistributedSchemas(List *schemas); -static void EnsureSequentialModeForSchemaDDL(void); static bool SchemaHasDistributedTableWithFKey(char *schemaName); static bool ShouldPropagateCreateSchemaStmt(void); @@ -62,7 +61,7 @@ PreprocessCreateSchemaStmt(Node *node, const char *queryString, EnsureCoordinator(); - EnsureSequentialModeForSchemaDDL(); + EnsureSequentialMode(OBJECT_SCHEMA); /* deparse sql*/ const char *sql = DeparseTreeNode(node); @@ -101,7 +100,7 @@ PreprocessDropSchemaStmt(Node *node, const char *queryString, return NIL; } - EnsureSequentialModeForSchemaDDL(); + EnsureSequentialMode(OBJECT_SCHEMA); Value *schemaVal = NULL; foreach_ptr(schemaVal, distributedSchemas) @@ -204,7 +203,7 @@ PreprocessAlterSchemaRenameStmt(Node *node, const char *queryString, /* deparse sql*/ const char *renameStmtSql = DeparseTreeNode(node); - EnsureSequentialModeForSchemaDDL(); + EnsureSequentialMode(OBJECT_SCHEMA); /* to prevent recursion with mx we disable ddl propagation */ List *commands = list_make3(DISABLE_DDL_PROPAGATION, @@ -293,44 +292,6 @@ FilterDistributedSchemas(List *schemas) } -/* - * EnsureSequentialModeForSchemaDDL makes sure that the current transaction is already in - * sequential mode, or can still safely be put in sequential mode, it errors if that is - * not possible. The error contains information for the user to retry the transaction with - * sequential mode set from the beginning. - * - * Copy-pasted from type.c - */ -static void -EnsureSequentialModeForSchemaDDL(void) -{ - if (!IsTransactionBlock()) - { - /* we do not need to switch to sequential mode if we are not in a transaction */ - return; - } - - if (ParallelQueryExecutedInTransaction()) - { - ereport(ERROR, (errmsg("cannot create or modify schema because there was a " - "parallel operation on a distributed table in the " - "transaction"), - errdetail("When creating, altering, or dropping a schema, Citus " - "needs to perform all operations over a single " - "connection per node to ensure consistency."), - errhint("Try re-running the transaction with " - "\"SET LOCAL citus.multi_shard_modify_mode TO " - "\'sequential\';\""))); - } - - ereport(DEBUG1, (errmsg("switching to sequential query execution mode"), - errdetail("Schema is created or altered. To make sure subsequent " - "commands see the schema correctly we need to make sure to " - "use only one connection for all future commands"))); - SetLocalMultiShardModifyModeToSequential(); -} - - /* * SchemaHasDistributedTableWithFKey takes a schema name and scans the relations within * that schema. If any one of the relations has a foreign key relationship, it returns diff --git a/src/backend/distributed/commands/type.c b/src/backend/distributed/commands/type.c index 81727afde..eb59e8522 100644 --- a/src/backend/distributed/commands/type.c +++ b/src/backend/distributed/commands/type.c @@ -92,7 +92,6 @@ bool EnableCreateTypePropagation = true; static List * FilterNameListForDistributedTypes(List *objects, bool missing_ok); static List * TypeNameListToObjectAddresses(List *objects); static TypeName * MakeTypeNameFromRangeVar(const RangeVar *relation); -static void EnsureSequentialModeForTypeDDL(void); static Oid GetTypeOwner(Oid typeOid); /* recreate functions */ @@ -158,7 +157,7 @@ PreprocessCompositeTypeStmt(Node *node, const char *queryString, * when we allow propagation within a transaction block we should make sure to only * allow this in sequential mode */ - EnsureSequentialModeForTypeDDL(); + EnsureSequentialMode(OBJECT_TYPE); List *commands = list_make3(DISABLE_DDL_PROPAGATION, (void *) compositeTypeStmtSql, @@ -223,7 +222,7 @@ PreprocessAlterTypeStmt(Node *node, const char *queryString, * regardless if in a transaction or not. If we would not propagate the alter * statement the types would be different on worker and coordinator. */ - EnsureSequentialModeForTypeDDL(); + EnsureSequentialMode(OBJECT_TYPE); List *commands = list_make3(DISABLE_DDL_PROPAGATION, (void *) alterTypeStmtSql, @@ -266,7 +265,7 @@ PreprocessCreateEnumStmt(Node *node, const char *queryString, * when we allow propagation within a transaction block we should make sure to only * allow this in sequential mode */ - EnsureSequentialModeForTypeDDL(); + EnsureSequentialMode(OBJECT_TYPE); /* to prevent recursion with mx we disable ddl propagation */ List *commands = list_make3(DISABLE_DDL_PROPAGATION, @@ -325,7 +324,7 @@ PreprocessAlterEnumStmt(Node *node, const char *queryString, * (adding values to an enum can not run in a transaction anyway and would error by * postgres already). */ - EnsureSequentialModeForTypeDDL(); + EnsureSequentialMode(OBJECT_TYPE); /* * managing types can only be done on the coordinator if ddl propagation is on. when @@ -405,7 +404,7 @@ PreprocessDropTypeStmt(Node *node, const char *queryString, char *dropStmtSql = DeparseTreeNode((Node *) stmt); stmt->objects = oldTypes; - EnsureSequentialModeForTypeDDL(); + EnsureSequentialMode(OBJECT_TYPE); /* to prevent recursion with mx we disable ddl propagation */ List *commands = list_make3(DISABLE_DDL_PROPAGATION, @@ -442,7 +441,7 @@ PreprocessRenameTypeStmt(Node *node, const char *queryString, /* deparse sql*/ const char *renameStmtSql = DeparseTreeNode(node); - EnsureSequentialModeForTypeDDL(); + EnsureSequentialMode(OBJECT_TYPE); /* to prevent recursion with mx we disable ddl propagation */ List *commands = list_make3(DISABLE_DDL_PROPAGATION, @@ -480,7 +479,7 @@ PreprocessRenameTypeAttributeStmt(Node *node, const char *queryString, const char *sql = DeparseTreeNode((Node *) stmt); - EnsureSequentialModeForTypeDDL(); + EnsureSequentialMode(OBJECT_TYPE); List *commands = list_make3(DISABLE_DDL_PROPAGATION, (void *) sql, ENABLE_DDL_PROPAGATION); @@ -513,7 +512,7 @@ PreprocessAlterTypeSchemaStmt(Node *node, const char *queryString, QualifyTreeNode((Node *) stmt); const char *sql = DeparseTreeNode((Node *) stmt); - EnsureSequentialModeForTypeDDL(); + EnsureSequentialMode(OBJECT_TYPE); List *commands = list_make3(DISABLE_DDL_PROPAGATION, (void *) sql, @@ -572,7 +571,7 @@ PreprocessAlterTypeOwnerStmt(Node *node, const char *queryString, QualifyTreeNode((Node *) stmt); const char *sql = DeparseTreeNode((Node *) stmt); - EnsureSequentialModeForTypeDDL(); + EnsureSequentialMode(OBJECT_TYPE); List *commands = list_make3(DISABLE_DDL_PROPAGATION, (void *) sql, ENABLE_DDL_PROPAGATION); @@ -1130,47 +1129,6 @@ MakeTypeNameFromRangeVar(const RangeVar *relation) } -/* - * EnsureSequentialModeForTypeDDL makes sure that the current transaction is already in - * sequential mode, or can still safely be put in sequential mode, it errors if that is - * not possible. The error contains information for the user to retry the transaction with - * sequential mode set from the beginning. - * - * As types are node scoped objects there exists only 1 instance of the type used by - * potentially multiple shards. To make sure all shards in the transaction can interact - * with the type the type needs to be visible on all connections used by the transaction, - * meaning we can only use 1 connection per node. - */ -static void -EnsureSequentialModeForTypeDDL(void) -{ - if (!IsTransactionBlock()) - { - /* we do not need to switch to sequential mode if we are not in a transaction */ - return; - } - - if (ParallelQueryExecutedInTransaction()) - { - ereport(ERROR, (errmsg("cannot create or modify type because there was a " - "parallel operation on a distributed table in the " - "transaction"), - errdetail("When creating or altering a type, Citus needs to " - "perform all operations over a single connection per " - "node to ensure consistency."), - errhint("Try re-running the transaction with " - "\"SET LOCAL citus.multi_shard_modify_mode TO " - "\'sequential\';\""))); - } - - ereport(DEBUG1, (errmsg("switching to sequential query execution mode"), - errdetail("Type is created or altered. To make sure subsequent " - "commands see the type correctly we need to make sure to " - "use only one connection for all future commands"))); - SetLocalMultiShardModifyModeToSequential(); -} - - /* * ShouldPropagateTypeCreate returns if we should propagate the creation of a type. * diff --git a/src/backend/distributed/executor/multi_executor.c b/src/backend/distributed/executor/multi_executor.c index a101d2968..cc7f8d3ac 100644 --- a/src/backend/distributed/executor/multi_executor.c +++ b/src/backend/distributed/executor/multi_executor.c @@ -32,6 +32,7 @@ #include "distributed/distributed_planner.h" #include "distributed/multi_router_planner.h" #include "distributed/multi_server_executor.h" +#include "distributed/relation_access_tracking.h" #include "distributed/resource_lock.h" #include "distributed/transaction_management.h" #include "distributed/version_compat.h" @@ -81,6 +82,7 @@ int ExecutorLevel = 0; /* local function forward declarations */ static Relation StubRelation(TupleDesc tupleDescriptor); +static char * GetObjectTypeString(ObjectType objType); static bool AlterTableConstraintCheck(QueryDesc *queryDesc); static List * FindCitusCustomScanStates(PlanState *planState); static bool CitusCustomScanStateWalker(PlanState *planState, @@ -691,6 +693,98 @@ SetLocalMultiShardModifyModeToSequential() } +/* + * EnsureSequentialMode makes sure that the current transaction is already in + * sequential mode, or can still safely be put in sequential mode, it errors if that is + * not possible. The error contains information for the user to retry the transaction with + * sequential mode set from the beginning. + * + * Takes an ObjectType to use in the error/debug messages. + */ +void +EnsureSequentialMode(ObjectType objType) +{ + char *objTypeString = GetObjectTypeString(objType); + + if (ParallelQueryExecutedInTransaction()) + { + ereport(ERROR, (errmsg("cannot run %s command because there was a " + "parallel operation on a distributed table in the " + "transaction", objTypeString), + errdetail("When running command on/for a distributed %s, Citus " + "needs to perform all operations over a single " + "connection per node to ensure consistency.", + objTypeString), + errhint("Try re-running the transaction with " + "\"SET LOCAL citus.multi_shard_modify_mode TO " + "\'sequential\';\""))); + } + + ereport(DEBUG1, (errmsg("switching to sequential query execution mode"), + errdetail( + "A command for a distributed %s is run. To make sure subsequent " + "commands see the %s correctly we need to make sure to " + "use only one connection for all future commands", + objTypeString, objTypeString))); + + SetLocalMultiShardModifyModeToSequential(); +} + + +/* + * GetObjectTypeString takes an ObjectType and returns the string version of it. + * We (for now) call this function only in EnsureSequentialMode, and use the returned + * string to generate error/debug messages. + * + * If GetObjectTypeString gets called with an ObjectType that is not in the switch + * statement, the function will return the string "object", and emit a debug message. + * In that case, make sure you've added the newly supported type to the switch statement. + */ +static char * +GetObjectTypeString(ObjectType objType) +{ + switch (objType) + { + case OBJECT_COLLATION: + { + return "collation"; + } + + case OBJECT_DATABASE: + { + return "database"; + } + + case OBJECT_EXTENSION: + { + return "extension"; + } + + case OBJECT_FUNCTION: + { + return "function"; + } + + case OBJECT_SCHEMA: + { + return "schema"; + } + + case OBJECT_TYPE: + { + return "type"; + } + + default: + { + ereport(DEBUG1, (errmsg("unsupported object type"), + errdetail("Please add string conversion for the object."))); + return "object"; + } + } +} + + /* * AlterTableConstraintCheck returns if the given query is an ALTER TABLE * constraint check query. diff --git a/src/backend/distributed/shared_library_init.c b/src/backend/distributed/shared_library_init.c index 5dbc7d3e1..521d3e406 100644 --- a/src/backend/distributed/shared_library_init.c +++ b/src/backend/distributed/shared_library_init.c @@ -513,6 +513,9 @@ CitusCleanupConnectionsAtExit(int code, Datum arg) * are already given away. */ DeallocateReservedConnections(); + + /* we don't want any monitoring view/udf to show already exited backends */ + UnSetGlobalPID(); } diff --git a/src/backend/distributed/transaction/backend_data.c b/src/backend/distributed/transaction/backend_data.c index 4e037e30c..bc54e1da9 100644 --- a/src/backend/distributed/transaction/backend_data.c +++ b/src/backend/distributed/transaction/backend_data.c @@ -90,7 +90,6 @@ static BackendData *MyBackendData = NULL; static void BackendManagementShmemInit(void); static size_t BackendManagementShmemSize(void); -static void UnSetGlobalPID(void); PG_FUNCTION_INFO_V1(assign_distributed_transaction_id); @@ -674,7 +673,7 @@ UnSetDistributedTransactionId(void) /* * UnSetGlobalPID resets the global pid for the current backend. */ -static void +void UnSetGlobalPID(void) { /* backend does not exist if the extension is not created */ diff --git a/src/include/distributed/backend_data.h b/src/include/distributed/backend_data.h index bf9a57fd1..7f3a81e88 100644 --- a/src/include/distributed/backend_data.h +++ b/src/include/distributed/backend_data.h @@ -62,6 +62,7 @@ extern void InitializeBackendData(void); extern void LockBackendSharedMemory(LWLockMode lockMode); extern void UnlockBackendSharedMemory(void); extern void UnSetDistributedTransactionId(void); +extern void UnSetGlobalPID(void); extern void AssignDistributedTransactionId(void); extern void MarkCitusInitiatedCoordinatorBackend(void); extern void AssignGlobalPID(void); diff --git a/src/include/distributed/multi_executor.h b/src/include/distributed/multi_executor.h index 3648dbc1b..dd10c511d 100644 --- a/src/include/distributed/multi_executor.h +++ b/src/include/distributed/multi_executor.h @@ -139,6 +139,7 @@ extern void ExecuteQueryIntoDestReceiver(Query *query, ParamListInfo params, extern void ExecutePlanIntoDestReceiver(PlannedStmt *queryPlan, ParamListInfo params, DestReceiver *dest); extern void SetLocalMultiShardModifyModeToSequential(void); +extern void EnsureSequentialMode(ObjectType objType); extern void SetLocalForceMaxQueryParallelization(void); extern void SortTupleStore(CitusScanState *scanState); extern bool DistributedPlanModifiesDatabase(DistributedPlan *plan); diff --git a/src/test/regress/expected/alter_database_owner.out b/src/test/regress/expected/alter_database_owner.out index 2e5e54aca..1fb118242 100644 --- a/src/test/regress/expected/alter_database_owner.out +++ b/src/test/regress/expected/alter_database_owner.out @@ -165,8 +165,8 @@ SELECT count(*) FROM t; -- parallel execution; (1 row) ALTER DATABASE regression OWNER TO database_owner_2; -- should ERROR -ERROR: cannot create or modify database because there was a parallel operation on a distributed table in the transaction -DETAIL: When creating or altering a database, Citus needs to perform all operations over a single connection per node to ensure consistency. +ERROR: cannot run database command because there was a parallel operation on a distributed table in the transaction +DETAIL: When running command on/for a distributed database, Citus needs to perform all operations over a single connection per node to ensure consistency. HINT: Try re-running the transaction with "SET LOCAL citus.multi_shard_modify_mode TO 'sequential';" ROLLBACK; -- list the owners of the current database on all nodes diff --git a/src/test/regress/expected/local_dist_join_mixed.out b/src/test/regress/expected/local_dist_join_mixed.out index 0e3e0aad4..5566186b5 100644 --- a/src/test/regress/expected/local_dist_join_mixed.out +++ b/src/test/regress/expected/local_dist_join_mixed.out @@ -1602,6 +1602,8 @@ DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS c (1 row) DROP SCHEMA local_dist_join_mixed CASCADE; +DEBUG: switching to sequential query execution mode +DETAIL: A command for a distributed schema is run. To make sure subsequent commands see the schema correctly we need to make sure to use only one connection for all future commands NOTICE: drop cascades to 7 other objects DETAIL: drop cascades to table distributed drop cascades to table reference diff --git a/src/test/regress/expected/multi_schema_support.out b/src/test/regress/expected/multi_schema_support.out index f7f11aa59..eb1c47401 100644 --- a/src/test/regress/expected/multi_schema_support.out +++ b/src/test/regress/expected/multi_schema_support.out @@ -1376,8 +1376,8 @@ BEGIN; (1 row) ALTER SCHEMA bar RENAME TO foo; -ERROR: cannot create or modify schema because there was a parallel operation on a distributed table in the transaction -DETAIL: When creating, altering, or dropping a schema, Citus needs to perform all operations over a single connection per node to ensure consistency. +ERROR: cannot run schema command because there was a parallel operation on a distributed table in the transaction +DETAIL: When running command on/for a distributed schema, Citus needs to perform all operations over a single connection per node to ensure consistency. HINT: Try re-running the transaction with "SET LOCAL citus.multi_shard_modify_mode TO 'sequential';" ROLLBACK; BEGIN; diff --git a/src/test/regress/expected/non_colocated_subquery_joins.out b/src/test/regress/expected/non_colocated_subquery_joins.out index 8f3db8492..527f0e1c2 100644 --- a/src/test/regress/expected/non_colocated_subquery_joins.out +++ b/src/test/regress/expected/non_colocated_subquery_joins.out @@ -8,6 +8,8 @@ -- =================================================================== SET client_min_messages TO DEBUG1; CREATE SCHEMA non_colocated_subquery; +DEBUG: switching to sequential query execution mode +DETAIL: A command for a distributed schema is run. To make sure subsequent commands see the schema correctly we need to make sure to use only one connection for all future commands SET search_path TO non_colocated_subquery, public; -- we don't use the data anyway CREATE TABLE users_table_local AS SELECT * FROM users_table LIMIT 0; diff --git a/src/test/regress/expected/single_node.out b/src/test/regress/expected/single_node.out index 0f94e6e7b..3b9f9f2c7 100644 --- a/src/test/regress/expected/single_node.out +++ b/src/test/regress/expected/single_node.out @@ -2011,8 +2011,28 @@ RESET citus.enable_manual_changes_to_shards ; -- these should work as expected TRUNCATE TABLE test_disabling_drop_and_truncate_102040; DROP TABLE test_disabling_drop_and_truncate_102040; -RESET citus.shard_replication_factor; DROP TABLE test_disabling_drop_and_truncate; +-- test creating distributed or reference tables from shards +CREATE TABLE test_creating_distributed_relation_table_from_shard (a int); +SELECT create_distributed_table('test_creating_distributed_relation_table_from_shard', 'a'); +NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (102044, 'single_node', 'CREATE TABLE single_node.test_creating_distributed_relation_table_from_shard (a integer) ');SELECT worker_apply_shard_ddl_command (102044, 'single_node', 'ALTER TABLE single_node.test_creating_distributed_relation_table_from_shard OWNER TO postgres') +NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (102045, 'single_node', 'CREATE TABLE single_node.test_creating_distributed_relation_table_from_shard (a integer) ');SELECT worker_apply_shard_ddl_command (102045, 'single_node', 'ALTER TABLE single_node.test_creating_distributed_relation_table_from_shard OWNER TO postgres') +NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (102046, 'single_node', 'CREATE TABLE single_node.test_creating_distributed_relation_table_from_shard (a integer) ');SELECT worker_apply_shard_ddl_command (102046, 'single_node', 'ALTER TABLE single_node.test_creating_distributed_relation_table_from_shard OWNER TO postgres') +NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (102047, 'single_node', 'CREATE TABLE single_node.test_creating_distributed_relation_table_from_shard (a integer) ');SELECT worker_apply_shard_ddl_command (102047, 'single_node', 'ALTER TABLE single_node.test_creating_distributed_relation_table_from_shard OWNER TO postgres') + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +-- these should error because shards cannot be used to: +-- create distributed table +SELECT create_distributed_table('test_creating_distributed_relation_table_from_shard_102044', 'a'); +ERROR: relation "test_creating_distributed_relation_table_from_shard_102044" is a shard relation +-- create reference table +SELECT create_reference_table('test_creating_distributed_relation_table_from_shard_102044'); +ERROR: relation "test_creating_distributed_relation_table_from_shard_102044" is a shard relation +RESET citus.shard_replication_factor; +DROP TABLE test_creating_distributed_relation_table_from_shard; -- lets flush the copy often to make sure everyhing is fine SET citus.local_copy_flush_threshold TO 1; TRUNCATE another_schema_table; diff --git a/src/test/regress/expected/single_node_truncate.out b/src/test/regress/expected/single_node_truncate.out index bf1c99d69..7d94dc744 100644 --- a/src/test/regress/expected/single_node_truncate.out +++ b/src/test/regress/expected/single_node_truncate.out @@ -31,9 +31,9 @@ SELECT * FROM table_sizes; name | has_data --------------------------------------------------------------------- citus_local | f - citus_local_102045 | t + citus_local_102049 | t ref | t - ref_102044 | t + ref_102048 | t (4 rows) -- verify that this UDF is noop on Citus local tables @@ -47,9 +47,9 @@ SELECT * FROM table_sizes; name | has_data --------------------------------------------------------------------- citus_local | f - citus_local_102045 | t + citus_local_102049 | t ref | t - ref_102044 | t + ref_102048 | t (4 rows) -- test that we allow cascading truncates to citus local tables @@ -65,9 +65,9 @@ SELECT * FROM table_sizes; name | has_data --------------------------------------------------------------------- citus_local | f - citus_local_102045 | t + citus_local_102049 | t ref | f - ref_102044 | t + ref_102048 | t (4 rows) ROLLBACK; @@ -98,14 +98,14 @@ SELECT * FROM table_sizes; name | has_data --------------------------------------------------------------------- citus_local | f - citus_local_102045 | t + citus_local_102049 | t dist | f - dist_102047 | t - dist_102048 | t - dist_102049 | t - dist_102050 | t + dist_102051 | t + dist_102052 | t + dist_102053 | t + dist_102054 | t ref | f - ref_102044 | t + ref_102048 | t (9 rows) ROLLBACK; @@ -121,14 +121,14 @@ SELECT * FROM table_sizes; name | has_data --------------------------------------------------------------------- citus_local | f - citus_local_102045 | t + citus_local_102049 | t dist | f - dist_102047 | t - dist_102048 | t - dist_102049 | t - dist_102050 | t + dist_102051 | t + dist_102052 | t + dist_102053 | t + dist_102054 | t ref | t - ref_102044 | t + ref_102048 | t (9 rows) ROLLBACK; diff --git a/src/test/regress/sql/single_node.sql b/src/test/regress/sql/single_node.sql index 13659f2eb..3ca456108 100644 --- a/src/test/regress/sql/single_node.sql +++ b/src/test/regress/sql/single_node.sql @@ -1017,9 +1017,22 @@ RESET citus.enable_manual_changes_to_shards ; TRUNCATE TABLE test_disabling_drop_and_truncate_102040; DROP TABLE test_disabling_drop_and_truncate_102040; -RESET citus.shard_replication_factor; DROP TABLE test_disabling_drop_and_truncate; +-- test creating distributed or reference tables from shards +CREATE TABLE test_creating_distributed_relation_table_from_shard (a int); +SELECT create_distributed_table('test_creating_distributed_relation_table_from_shard', 'a'); + +-- these should error because shards cannot be used to: +-- create distributed table +SELECT create_distributed_table('test_creating_distributed_relation_table_from_shard_102044', 'a'); + +-- create reference table +SELECT create_reference_table('test_creating_distributed_relation_table_from_shard_102044'); + +RESET citus.shard_replication_factor; +DROP TABLE test_creating_distributed_relation_table_from_shard; + -- lets flush the copy often to make sure everyhing is fine SET citus.local_copy_flush_threshold TO 1; TRUNCATE another_schema_table;