From ed107835cb002835c2006a004348aa18d2ac0b60 Mon Sep 17 00:00:00 2001 From: gledis69 Date: Mon, 7 Feb 2022 18:34:39 +0300 Subject: [PATCH 1/7] Updates a few details in Contributing.md * Adds installation of `mitmproxy`. I was getting this error from running regression tests: ``` Can't exec "mitmdump": No such file or directory at /home/glediszeneli/citus/src/test/regress/pg_regress_multi.pl line 215. ``` * Calls `install-all` in the setup. Without `install-all` the `mutli-extension` regression test failed. --- CONTRIBUTING.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 7777554b2..8e85fbbba 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -31,7 +31,7 @@ why we ask this as well as instructions for how to proceed, see the cd citus ./configure make - make install + make install install-all cd src/test/regress make check ``` @@ -51,7 +51,7 @@ why we ask this as well as instructions for how to proceed, see the autoconf flex git libcurl4-gnutls-dev libicu-dev \ libkrb5-dev liblz4-dev libpam0g-dev libreadline-dev \ libselinux1-dev libssl-dev libxslt1-dev libzstd-dev \ - make uuid-dev + make uuid-dev mitmproxy ``` 2. Get, build, and test the code From 4c2a0f0aa0aa3193399202f680bdd718bcda8e9e Mon Sep 17 00:00:00 2001 From: gledis69 Date: Wed, 9 Feb 2022 22:00:39 +0300 Subject: [PATCH 2/7] Removing install-all, but adding a comment about it --- CONTRIBUTING.md | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 8e85fbbba..692f0c743 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -31,7 +31,7 @@ why we ask this as well as instructions for how to proceed, see the cd citus ./configure make - make install install-all + make install cd src/test/regress make check ``` @@ -62,6 +62,8 @@ why we ask this as well as instructions for how to proceed, see the ./configure make sudo make install + # Optionally, you might instead want to use `sudo make install-all` + # since `multi_extension` test would fail due to missing downgrade scripts. cd src/test/regress make check ``` From 49c594a55075f46a82a38a9f749a50f67100b1c7 Mon Sep 17 00:00:00 2001 From: gledis69 Date: Fri, 11 Feb 2022 14:17:22 +0300 Subject: [PATCH 3/7] Adding install-all comment to all OS-es --- CONTRIBUTING.md | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 692f0c743..47d628034 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -32,6 +32,8 @@ why we ask this as well as instructions for how to proceed, see the ./configure make make install + # Optionally, you might instead want to use `sudo make install-all` + # since `multi_extension` regression test would fail due to missing downgrade scripts. cd src/test/regress make check ``` @@ -63,7 +65,7 @@ why we ask this as well as instructions for how to proceed, see the make sudo make install # Optionally, you might instead want to use `sudo make install-all` - # since `multi_extension` test would fail due to missing downgrade scripts. + # since `multi_extension` regression test would fail due to missing downgrade scripts. cd src/test/regress make check ``` @@ -106,6 +108,8 @@ why we ask this as well as instructions for how to proceed, see the PG_CONFIG=/usr/pgsql-14/bin/pg_config ./configure make sudo make install + # Optionally, you might instead want to use `sudo make install-all` + # since `multi_extension` regression test would fail due to missing downgrade scripts. cd src/test/regress make check ``` @@ -127,6 +131,8 @@ cd build cmake .. make -j5 sudo make install +# Optionally, you might instead want to use `sudo make install-all` +# since `multi_extension` regression test would fail due to missing downgrade scripts. cd ../.. git clone https://github.com/citusdata/tools.git From 5478bd0105421982664186b68005a6e0271798bb Mon Sep 17 00:00:00 2001 From: gledis69 Date: Fri, 11 Feb 2022 14:23:43 +0300 Subject: [PATCH 4/7] Remove extra sudo in comment for Mac --- CONTRIBUTING.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 47d628034..ae8d8c625 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -32,7 +32,7 @@ why we ask this as well as instructions for how to proceed, see the ./configure make make install - # Optionally, you might instead want to use `sudo make install-all` + # Optionally, you might instead want to use `make install-all` # since `multi_extension` regression test would fail due to missing downgrade scripts. cd src/test/regress make check From badfd561b2a3ee408a11478781e1bf82135aa7f0 Mon Sep 17 00:00:00 2001 From: Gledis Zeneli <43916939+gledis69@users.noreply.github.com> Date: Mon, 14 Feb 2022 16:06:48 +0300 Subject: [PATCH 5/7] Prevent Citus table functions from being called on shards (Fix #5610) (#5694) DESCRIPTION: Prevent Citus table functions from being called on shards The operations that guard against using shards are: * Create Local Table * Create distributed table (which affects reference table creation as well). * I used a `ErrorIfRaltionIsKnownShard` instead of `ErrorIfIllegallyChangingKnownShard`. `ErrorIfIllegallyChangingKnownShard` allows the operation if `citus.enable_manual_changes_to_shards`, but I am not sure if it ever makes sense to create a distributed, reference, or citus local table out of a shard. I tried to go over the code to identify other UDF-s where shards could be illegaly changed, but I could not find any other. My knowledge of the codebase is not solid enough for me to say for sure. Fixes #5610 --- .../commands/create_distributed_table.c | 10 ++++++ src/test/regress/expected/single_node.out | 22 +++++++++++- .../regress/expected/single_node_truncate.out | 36 +++++++++---------- src/test/regress/sql/single_node.sql | 15 +++++++- 4 files changed, 63 insertions(+), 20 deletions(-) diff --git a/src/backend/distributed/commands/create_distributed_table.c b/src/backend/distributed/commands/create_distributed_table.c index 43bd1ebd6..eb51195bf 100644 --- a/src/backend/distributed/commands/create_distributed_table.c +++ b/src/backend/distributed/commands/create_distributed_table.c @@ -60,6 +60,7 @@ #include "distributed/remote_commands.h" #include "distributed/shared_library_init.h" #include "distributed/worker_protocol.h" +#include "distributed/worker_shard_visibility.h" #include "distributed/worker_transaction.h" #include "distributed/version_compat.h" #include "executor/executor.h" @@ -327,6 +328,7 @@ create_reference_table(PG_FUNCTION_ARGS) * - we are on the coordinator * - the current user is the owner of the table * - relation kind is supported + * - relation is not a shard */ static void EnsureCitusTableCanBeCreated(Oid relationOid) @@ -343,6 +345,14 @@ EnsureCitusTableCanBeCreated(Oid relationOid) * will be performed in CreateDistributedTable. */ EnsureRelationKindSupported(relationOid); + + /* + * When coordinator is added to the metadata, or on the workers, + * some of the relations of the coordinator node may/will be shards. + * We disallow creating distributed tables from shard relations, by + * erroring out here. + */ + ErrorIfRelationIsAKnownShard(relationOid); } diff --git a/src/test/regress/expected/single_node.out b/src/test/regress/expected/single_node.out index 0f94e6e7b..3b9f9f2c7 100644 --- a/src/test/regress/expected/single_node.out +++ b/src/test/regress/expected/single_node.out @@ -2011,8 +2011,28 @@ RESET citus.enable_manual_changes_to_shards ; -- these should work as expected TRUNCATE TABLE test_disabling_drop_and_truncate_102040; DROP TABLE test_disabling_drop_and_truncate_102040; -RESET citus.shard_replication_factor; DROP TABLE test_disabling_drop_and_truncate; +-- test creating distributed or reference tables from shards +CREATE TABLE test_creating_distributed_relation_table_from_shard (a int); +SELECT create_distributed_table('test_creating_distributed_relation_table_from_shard', 'a'); +NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (102044, 'single_node', 'CREATE TABLE single_node.test_creating_distributed_relation_table_from_shard (a integer) ');SELECT worker_apply_shard_ddl_command (102044, 'single_node', 'ALTER TABLE single_node.test_creating_distributed_relation_table_from_shard OWNER TO postgres') +NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (102045, 'single_node', 'CREATE TABLE single_node.test_creating_distributed_relation_table_from_shard (a integer) ');SELECT worker_apply_shard_ddl_command (102045, 'single_node', 'ALTER TABLE single_node.test_creating_distributed_relation_table_from_shard OWNER TO postgres') +NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (102046, 'single_node', 'CREATE TABLE single_node.test_creating_distributed_relation_table_from_shard (a integer) ');SELECT worker_apply_shard_ddl_command (102046, 'single_node', 'ALTER TABLE single_node.test_creating_distributed_relation_table_from_shard OWNER TO postgres') +NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (102047, 'single_node', 'CREATE TABLE single_node.test_creating_distributed_relation_table_from_shard (a integer) ');SELECT worker_apply_shard_ddl_command (102047, 'single_node', 'ALTER TABLE single_node.test_creating_distributed_relation_table_from_shard OWNER TO postgres') + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +-- these should error because shards cannot be used to: +-- create distributed table +SELECT create_distributed_table('test_creating_distributed_relation_table_from_shard_102044', 'a'); +ERROR: relation "test_creating_distributed_relation_table_from_shard_102044" is a shard relation +-- create reference table +SELECT create_reference_table('test_creating_distributed_relation_table_from_shard_102044'); +ERROR: relation "test_creating_distributed_relation_table_from_shard_102044" is a shard relation +RESET citus.shard_replication_factor; +DROP TABLE test_creating_distributed_relation_table_from_shard; -- lets flush the copy often to make sure everyhing is fine SET citus.local_copy_flush_threshold TO 1; TRUNCATE another_schema_table; diff --git a/src/test/regress/expected/single_node_truncate.out b/src/test/regress/expected/single_node_truncate.out index bf1c99d69..7d94dc744 100644 --- a/src/test/regress/expected/single_node_truncate.out +++ b/src/test/regress/expected/single_node_truncate.out @@ -31,9 +31,9 @@ SELECT * FROM table_sizes; name | has_data --------------------------------------------------------------------- citus_local | f - citus_local_102045 | t + citus_local_102049 | t ref | t - ref_102044 | t + ref_102048 | t (4 rows) -- verify that this UDF is noop on Citus local tables @@ -47,9 +47,9 @@ SELECT * FROM table_sizes; name | has_data --------------------------------------------------------------------- citus_local | f - citus_local_102045 | t + citus_local_102049 | t ref | t - ref_102044 | t + ref_102048 | t (4 rows) -- test that we allow cascading truncates to citus local tables @@ -65,9 +65,9 @@ SELECT * FROM table_sizes; name | has_data --------------------------------------------------------------------- citus_local | f - citus_local_102045 | t + citus_local_102049 | t ref | f - ref_102044 | t + ref_102048 | t (4 rows) ROLLBACK; @@ -98,14 +98,14 @@ SELECT * FROM table_sizes; name | has_data --------------------------------------------------------------------- citus_local | f - citus_local_102045 | t + citus_local_102049 | t dist | f - dist_102047 | t - dist_102048 | t - dist_102049 | t - dist_102050 | t + dist_102051 | t + dist_102052 | t + dist_102053 | t + dist_102054 | t ref | f - ref_102044 | t + ref_102048 | t (9 rows) ROLLBACK; @@ -121,14 +121,14 @@ SELECT * FROM table_sizes; name | has_data --------------------------------------------------------------------- citus_local | f - citus_local_102045 | t + citus_local_102049 | t dist | f - dist_102047 | t - dist_102048 | t - dist_102049 | t - dist_102050 | t + dist_102051 | t + dist_102052 | t + dist_102053 | t + dist_102054 | t ref | t - ref_102044 | t + ref_102048 | t (9 rows) ROLLBACK; diff --git a/src/test/regress/sql/single_node.sql b/src/test/regress/sql/single_node.sql index 13659f2eb..3ca456108 100644 --- a/src/test/regress/sql/single_node.sql +++ b/src/test/regress/sql/single_node.sql @@ -1017,9 +1017,22 @@ RESET citus.enable_manual_changes_to_shards ; TRUNCATE TABLE test_disabling_drop_and_truncate_102040; DROP TABLE test_disabling_drop_and_truncate_102040; -RESET citus.shard_replication_factor; DROP TABLE test_disabling_drop_and_truncate; +-- test creating distributed or reference tables from shards +CREATE TABLE test_creating_distributed_relation_table_from_shard (a int); +SELECT create_distributed_table('test_creating_distributed_relation_table_from_shard', 'a'); + +-- these should error because shards cannot be used to: +-- create distributed table +SELECT create_distributed_table('test_creating_distributed_relation_table_from_shard_102044', 'a'); + +-- create reference table +SELECT create_reference_table('test_creating_distributed_relation_table_from_shard_102044'); + +RESET citus.shard_replication_factor; +DROP TABLE test_creating_distributed_relation_table_from_shard; + -- lets flush the copy often to make sure everyhing is fine SET citus.local_copy_flush_threshold TO 1; TRUNCATE another_schema_table; From 0411a98c99f5bfe775a3c89e29c5863d2aa861d1 Mon Sep 17 00:00:00 2001 From: Ahmet Gedemenli Date: Mon, 14 Feb 2022 18:38:21 +0300 Subject: [PATCH 6/7] Refactor EnsureSequentialMode functions (#5704) --- src/backend/distributed/commands/collation.c | 50 +--------- src/backend/distributed/commands/database.c | 39 +------- src/backend/distributed/commands/extension.c | 47 +--------- src/backend/distributed/commands/function.c | 51 ++-------- src/backend/distributed/commands/schema.c | 45 +-------- src/backend/distributed/commands/type.c | 60 ++---------- .../distributed/executor/multi_executor.c | 94 +++++++++++++++++++ src/include/distributed/multi_executor.h | 1 + .../regress/expected/alter_database_owner.out | 4 +- .../expected/forcedelegation_functions.out | 34 +++---- .../expected/local_dist_join_mixed.out | 2 + .../expected/multi_function_in_join.out | 2 +- .../expected/multi_function_in_join_0.out | 2 +- .../expected/multi_mx_add_coordinator.out | 2 +- src/test/regress/expected/multi_mx_call.out | 6 +- src/test/regress/expected/multi_mx_call_0.out | 6 +- .../multi_mx_function_call_delegation.out | 10 +- .../multi_mx_function_call_delegation_0.out | 10 +- .../regress/expected/multi_schema_support.out | 4 +- .../expected/non_colocated_subquery_joins.out | 2 + 20 files changed, 167 insertions(+), 304 deletions(-) diff --git a/src/backend/distributed/commands/collation.c b/src/backend/distributed/commands/collation.c index ddbe63f52..d712c18ab 100644 --- a/src/backend/distributed/commands/collation.c +++ b/src/backend/distributed/commands/collation.c @@ -37,7 +37,6 @@ static char * CreateCollationDDLInternal(Oid collationId, Oid *collowner, char **quotedCollationName); static List * FilterNameListForDistributedCollations(List *objects, bool missing_ok, List **addresses); -static void EnsureSequentialModeForCollationDDL(void); /* @@ -256,7 +255,7 @@ PreprocessDropCollationStmt(Node *node, const char *queryString, char *dropStmtSql = DeparseTreeNode((Node *) stmt); stmt->objects = oldCollations; - EnsureSequentialModeForCollationDDL(); + EnsureSequentialMode(OBJECT_COLLATION); /* to prevent recursion with mx we disable ddl propagation */ List *commands = list_make3(DISABLE_DDL_PROPAGATION, @@ -292,7 +291,7 @@ PreprocessAlterCollationOwnerStmt(Node *node, const char *queryString, QualifyTreeNode((Node *) stmt); char *sql = DeparseTreeNode((Node *) stmt); - EnsureSequentialModeForCollationDDL(); + EnsureSequentialMode(OBJECT_COLLATION); List *commands = list_make3(DISABLE_DDL_PROPAGATION, (void *) sql, ENABLE_DDL_PROPAGATION); @@ -328,7 +327,7 @@ PreprocessRenameCollationStmt(Node *node, const char *queryString, /* deparse sql*/ char *renameStmtSql = DeparseTreeNode((Node *) stmt); - EnsureSequentialModeForCollationDDL(); + EnsureSequentialMode(OBJECT_COLLATION); /* to prevent recursion with mx we disable ddl propagation */ List *commands = list_make3(DISABLE_DDL_PROPAGATION, @@ -363,7 +362,7 @@ PreprocessAlterCollationSchemaStmt(Node *node, const char *queryString, QualifyTreeNode((Node *) stmt); char *sql = DeparseTreeNode((Node *) stmt); - EnsureSequentialModeForCollationDDL(); + EnsureSequentialMode(OBJECT_COLLATION); List *commands = list_make3(DISABLE_DDL_PROPAGATION, (void *) sql, @@ -453,47 +452,6 @@ AlterCollationSchemaStmtObjectAddress(Node *node, bool missing_ok) } -/* - * EnsureSequentialModeForCollationDDL makes sure that the current transaction is already in - * sequential mode, or can still safely be put in sequential mode, it errors if that is - * not possible. The error contains information for the user to retry the transaction with - * sequential mode set from the beginning. - * - * As collations are node scoped objects there exists only 1 instance of the collation used by - * potentially multiple shards. To make sure all shards in the transaction can interact - * with the type the type needs to be visible on all connections used by the transaction, - * meaning we can only use 1 connection per node. - */ -static void -EnsureSequentialModeForCollationDDL(void) -{ - if (!IsTransactionBlock()) - { - /* we do not need to switch to sequential mode if we are not in a transaction */ - return; - } - - if (ParallelQueryExecutedInTransaction()) - { - ereport(ERROR, (errmsg("cannot create or modify collation because there was a " - "parallel operation on a distributed table in the " - "transaction"), - errdetail("When creating or altering a collation, Citus needs to " - "perform all operations over a single connection per " - "node to ensure consistency."), - errhint("Try re-running the transaction with " - "\"SET LOCAL citus.multi_shard_modify_mode TO " - "\'sequential\';\""))); - } - - ereport(DEBUG1, (errmsg("switching to sequential query execution mode"), - errdetail("Collation is created or altered. To make sure subsequent " - "commands see the collation correctly we need to make sure to " - "use only one connection for all future commands"))); - SetLocalMultiShardModifyModeToSequential(); -} - - /* * GenerateBackupNameForCollationCollision generates a new collation name for an existing collation. * The name is generated in such a way that the new name doesn't overlap with an existing collation diff --git a/src/backend/distributed/commands/database.c b/src/backend/distributed/commands/database.c index b4fb15110..59902b038 100644 --- a/src/backend/distributed/commands/database.c +++ b/src/backend/distributed/commands/database.c @@ -29,7 +29,6 @@ #include "distributed/relation_access_tracking.h" #include "distributed/worker_transaction.h" -static void EnsureSequentialModeForDatabaseDDL(void); static AlterOwnerStmt * RecreateAlterDatabaseOwnerStmt(Oid databaseOid); static Oid get_database_owner(Oid db_oid); @@ -66,7 +65,7 @@ PreprocessAlterDatabaseOwnerStmt(Node *node, const char *queryString, QualifyTreeNode((Node *) stmt); const char *sql = DeparseTreeNode((Node *) stmt); - EnsureSequentialModeForDatabaseDDL(); + EnsureSequentialMode(OBJECT_DATABASE); List *commands = list_make3(DISABLE_DDL_PROPAGATION, (void *) sql, ENABLE_DDL_PROPAGATION); @@ -177,39 +176,3 @@ get_database_owner(Oid db_oid) return dba; } - - -/* - * EnsureSequentialModeForDatabaseDDL makes sure that the current transaction is already - * in sequential mode, or can still safely be put in sequential mode, it errors if that is - * not possible. The error contains information for the user to retry the transaction with - * sequential mode set from the beginning. - */ -static void -EnsureSequentialModeForDatabaseDDL(void) -{ - if (!IsTransactionBlock()) - { - /* we do not need to switch to sequential mode if we are not in a transaction */ - return; - } - - if (ParallelQueryExecutedInTransaction()) - { - ereport(ERROR, (errmsg("cannot create or modify database because there was a " - "parallel operation on a distributed table in the " - "transaction"), - errdetail("When creating or altering a database, Citus needs to " - "perform all operations over a single connection per " - "node to ensure consistency."), - errhint("Try re-running the transaction with " - "\"SET LOCAL citus.multi_shard_modify_mode TO " - "\'sequential\';\""))); - } - - ereport(DEBUG1, (errmsg("switching to sequential query execution mode"), - errdetail("Database is created or altered. To make sure subsequent " - "commands see the type correctly we need to make sure to " - "use only one connection for all future commands"))); - SetLocalMultiShardModifyModeToSequential(); -} diff --git a/src/backend/distributed/commands/extension.c b/src/backend/distributed/commands/extension.c index ef5b6c1b1..dc1363a65 100644 --- a/src/backend/distributed/commands/extension.c +++ b/src/backend/distributed/commands/extension.c @@ -37,7 +37,6 @@ static void AddSchemaFieldIfMissing(CreateExtensionStmt *stmt); static List * FilterDistributedExtensions(List *extensionObjectList); static List * ExtensionNameListToObjectAddressList(List *extensionObjectList); static void MarkExistingObjectDependenciesDistributedIfSupported(void); -static void EnsureSequentialModeForExtensionDDL(void); static bool ShouldPropagateExtensionCommand(Node *parseTree); static bool IsAlterExtensionSetSchemaCitus(Node *parseTree); static Node * RecreateExtensionStmt(Oid extensionOid); @@ -163,7 +162,7 @@ PostprocessCreateExtensionStmt(Node *node, const char *queryString) * Make sure that the current transaction is already in sequential mode, * or can still safely be put in sequential mode */ - EnsureSequentialModeForExtensionDDL(); + EnsureSequentialMode(OBJECT_EXTENSION); /* * Here we append "schema" field to the "options" list (if not specified) @@ -274,7 +273,7 @@ PreprocessDropExtensionStmt(Node *node, const char *queryString, * Make sure that the current transaction is already in sequential mode, * or can still safely be put in sequential mode */ - EnsureSequentialModeForExtensionDDL(); + EnsureSequentialMode(OBJECT_EXTENSION); List *distributedExtensionAddresses = ExtensionNameListToObjectAddressList( distributedExtensions); @@ -409,7 +408,7 @@ PreprocessAlterExtensionSchemaStmt(Node *node, const char *queryString, * Make sure that the current transaction is already in sequential mode, * or can still safely be put in sequential mode */ - EnsureSequentialModeForExtensionDDL(); + EnsureSequentialMode(OBJECT_EXTENSION); const char *alterExtensionStmtSql = DeparseTreeNode(node); @@ -478,7 +477,7 @@ PreprocessAlterExtensionUpdateStmt(Node *node, const char *queryString, * Make sure that the current transaction is already in sequential mode, * or can still safely be put in sequential mode */ - EnsureSequentialModeForExtensionDDL(); + EnsureSequentialMode(OBJECT_EXTENSION); const char *alterExtensionStmtSql = DeparseTreeNode((Node *) alterExtensionStmt); @@ -603,44 +602,6 @@ PreprocessAlterExtensionContentsStmt(Node *node, const char *queryString, } -/* - * EnsureSequentialModeForExtensionDDL makes sure that the current transaction is already in - * sequential mode, or can still safely be put in sequential mode, it errors if that is - * not possible. The error contains information for the user to retry the transaction with - * sequential mode set from the beginning. - * - * As extensions are node scoped objects there exists only 1 instance of the - * extension used by potentially multiple shards. To make sure all shards in - * the transaction can interact with the extension the extension needs to be - * visible on all connections used by the transaction, meaning we can only use - * 1 connection per node. - */ -static void -EnsureSequentialModeForExtensionDDL(void) -{ - if (ParallelQueryExecutedInTransaction()) - { - ereport(ERROR, (errmsg("cannot run extension command because there was a " - "parallel operation on a distributed table in the " - "transaction"), - errdetail( - "When running command on/for a distributed extension, Citus needs to " - "perform all operations over a single connection per " - "node to ensure consistency."), - errhint("Try re-running the transaction with " - "\"SET LOCAL citus.multi_shard_modify_mode TO " - "\'sequential\';\""))); - } - - ereport(DEBUG1, (errmsg("switching to sequential query execution mode"), - errdetail( - "A command for a distributed extension is run. To make sure subsequent " - "commands see the type correctly we need to make sure to " - "use only one connection for all future commands"))); - SetLocalMultiShardModifyModeToSequential(); -} - - /* * ShouldPropagateExtensionCommand determines whether to propagate an extension * command to the worker nodes. diff --git a/src/backend/distributed/commands/function.c b/src/backend/distributed/commands/function.c index ab03fe77a..95b11fd0c 100644 --- a/src/backend/distributed/commands/function.c +++ b/src/backend/distributed/commands/function.c @@ -77,7 +77,6 @@ static int GetFunctionColocationId(Oid functionOid, char *colocateWithName, Oid static void EnsureFunctionCanBeColocatedWithTable(Oid functionOid, Oid distributionColumnType, Oid sourceRelationId); -static void EnsureSequentialModeForFunctionDDL(void); static bool ShouldPropagateCreateFunction(CreateFunctionStmt *stmt); static bool ShouldPropagateAlterFunction(const ObjectAddress *address); static bool ShouldAddFunctionSignature(FunctionParameterMode mode); @@ -207,7 +206,7 @@ create_distributed_function(PG_FUNCTION_ARGS) * when we allow propagation within a transaction block we should make sure * to only allow this in sequential mode. */ - EnsureSequentialModeForFunctionDDL(); + EnsureSequentialMode(OBJECT_FUNCTION); EnsureDependenciesExistOnAllNodes(&functionAddress); @@ -1169,42 +1168,6 @@ GetAggregateDDLCommand(const RegProcedure funcOid, bool useCreateOrReplace) } -/* - * EnsureSequentialModeForFunctionDDL makes sure that the current transaction is already in - * sequential mode, or can still safely be put in sequential mode, it errors if that is - * not possible. The error contains information for the user to retry the transaction with - * sequential mode set from the beginning. - * - * As functions are node scoped objects there exists only 1 instance of the function used by - * potentially multiple shards. To make sure all shards in the transaction can interact - * with the function the function needs to be visible on all connections used by the transaction, - * meaning we can only use 1 connection per node. - */ -static void -EnsureSequentialModeForFunctionDDL(void) -{ - if (ParallelQueryExecutedInTransaction()) - { - ereport(ERROR, (errmsg("cannot create function because there was a " - "parallel operation on a distributed table in the " - "transaction"), - errdetail("When creating a distributed function, Citus needs to " - "perform all operations over a single connection per " - "node to ensure consistency."), - errhint("Try re-running the transaction with " - "\"SET LOCAL citus.multi_shard_modify_mode TO " - "\'sequential\';\""))); - } - - ereport(DEBUG1, (errmsg("switching to sequential query execution mode"), - errdetail( - "A distributed function is created. To make sure subsequent " - "commands see the type correctly we need to make sure to " - "use only one connection for all future commands"))); - SetLocalMultiShardModifyModeToSequential(); -} - - /* * ShouldPropagateCreateFunction tests if we need to propagate a CREATE FUNCTION * statement. We only propagate replace's of distributed functions to keep the function on @@ -1313,7 +1276,7 @@ PreprocessCreateFunctionStmt(Node *node, const char *queryString, EnsureCoordinator(); - EnsureSequentialModeForFunctionDDL(); + EnsureSequentialMode(OBJECT_FUNCTION); /* * ddl jobs will be generated during the Processing phase as we need the function to @@ -1432,7 +1395,7 @@ PreprocessAlterFunctionStmt(Node *node, const char *queryString, EnsureCoordinator(); ErrorIfUnsupportedAlterFunctionStmt(stmt); - EnsureSequentialModeForFunctionDDL(); + EnsureSequentialMode(OBJECT_FUNCTION); QualifyTreeNode((Node *) stmt); const char *sql = DeparseTreeNode((Node *) stmt); @@ -1466,7 +1429,7 @@ PreprocessRenameFunctionStmt(Node *node, const char *queryString, } EnsureCoordinator(); - EnsureSequentialModeForFunctionDDL(); + EnsureSequentialMode(OBJECT_FUNCTION); QualifyTreeNode((Node *) stmt); const char *sql = DeparseTreeNode((Node *) stmt); @@ -1498,7 +1461,7 @@ PreprocessAlterFunctionSchemaStmt(Node *node, const char *queryString, } EnsureCoordinator(); - EnsureSequentialModeForFunctionDDL(); + EnsureSequentialMode(OBJECT_FUNCTION); QualifyTreeNode((Node *) stmt); const char *sql = DeparseTreeNode((Node *) stmt); @@ -1531,7 +1494,7 @@ PreprocessAlterFunctionOwnerStmt(Node *node, const char *queryString, } EnsureCoordinator(); - EnsureSequentialModeForFunctionDDL(); + EnsureSequentialMode(OBJECT_FUNCTION); QualifyTreeNode((Node *) stmt); const char *sql = DeparseTreeNode((Node *) stmt); @@ -1621,7 +1584,7 @@ PreprocessDropFunctionStmt(Node *node, const char *queryString, * types, so we block the call. */ EnsureCoordinator(); - EnsureSequentialModeForFunctionDDL(); + EnsureSequentialMode(OBJECT_FUNCTION); /* remove the entries for the distributed objects on dropping */ ObjectAddress *address = NULL; diff --git a/src/backend/distributed/commands/schema.c b/src/backend/distributed/commands/schema.c index adb26b5e7..e0c0f2b11 100644 --- a/src/backend/distributed/commands/schema.c +++ b/src/backend/distributed/commands/schema.c @@ -42,7 +42,6 @@ static ObjectAddress GetObjectAddressBySchemaName(char *schemaName, bool missing_ok); static List * FilterDistributedSchemas(List *schemas); -static void EnsureSequentialModeForSchemaDDL(void); static bool SchemaHasDistributedTableWithFKey(char *schemaName); static bool ShouldPropagateCreateSchemaStmt(void); @@ -62,7 +61,7 @@ PreprocessCreateSchemaStmt(Node *node, const char *queryString, EnsureCoordinator(); - EnsureSequentialModeForSchemaDDL(); + EnsureSequentialMode(OBJECT_SCHEMA); /* deparse sql*/ const char *sql = DeparseTreeNode(node); @@ -101,7 +100,7 @@ PreprocessDropSchemaStmt(Node *node, const char *queryString, return NIL; } - EnsureSequentialModeForSchemaDDL(); + EnsureSequentialMode(OBJECT_SCHEMA); Value *schemaVal = NULL; foreach_ptr(schemaVal, distributedSchemas) @@ -204,7 +203,7 @@ PreprocessAlterSchemaRenameStmt(Node *node, const char *queryString, /* deparse sql*/ const char *renameStmtSql = DeparseTreeNode(node); - EnsureSequentialModeForSchemaDDL(); + EnsureSequentialMode(OBJECT_SCHEMA); /* to prevent recursion with mx we disable ddl propagation */ List *commands = list_make3(DISABLE_DDL_PROPAGATION, @@ -293,44 +292,6 @@ FilterDistributedSchemas(List *schemas) } -/* - * EnsureSequentialModeForSchemaDDL makes sure that the current transaction is already in - * sequential mode, or can still safely be put in sequential mode, it errors if that is - * not possible. The error contains information for the user to retry the transaction with - * sequential mode set from the beginning. - * - * Copy-pasted from type.c - */ -static void -EnsureSequentialModeForSchemaDDL(void) -{ - if (!IsTransactionBlock()) - { - /* we do not need to switch to sequential mode if we are not in a transaction */ - return; - } - - if (ParallelQueryExecutedInTransaction()) - { - ereport(ERROR, (errmsg("cannot create or modify schema because there was a " - "parallel operation on a distributed table in the " - "transaction"), - errdetail("When creating, altering, or dropping a schema, Citus " - "needs to perform all operations over a single " - "connection per node to ensure consistency."), - errhint("Try re-running the transaction with " - "\"SET LOCAL citus.multi_shard_modify_mode TO " - "\'sequential\';\""))); - } - - ereport(DEBUG1, (errmsg("switching to sequential query execution mode"), - errdetail("Schema is created or altered. To make sure subsequent " - "commands see the schema correctly we need to make sure to " - "use only one connection for all future commands"))); - SetLocalMultiShardModifyModeToSequential(); -} - - /* * SchemaHasDistributedTableWithFKey takes a schema name and scans the relations within * that schema. If any one of the relations has a foreign key relationship, it returns diff --git a/src/backend/distributed/commands/type.c b/src/backend/distributed/commands/type.c index 78b56c2ff..524124285 100644 --- a/src/backend/distributed/commands/type.c +++ b/src/backend/distributed/commands/type.c @@ -92,7 +92,6 @@ bool EnableCreateTypePropagation = true; static List * FilterNameListForDistributedTypes(List *objects, bool missing_ok); static List * TypeNameListToObjectAddresses(List *objects); static TypeName * MakeTypeNameFromRangeVar(const RangeVar *relation); -static void EnsureSequentialModeForTypeDDL(void); static Oid GetTypeOwner(Oid typeOid); /* recreate functions */ @@ -158,7 +157,7 @@ PreprocessCompositeTypeStmt(Node *node, const char *queryString, * when we allow propagation within a transaction block we should make sure to only * allow this in sequential mode */ - EnsureSequentialModeForTypeDDL(); + EnsureSequentialMode(OBJECT_TYPE); List *commands = list_make3(DISABLE_DDL_PROPAGATION, (void *) compositeTypeStmtSql, @@ -223,7 +222,7 @@ PreprocessAlterTypeStmt(Node *node, const char *queryString, * regardless if in a transaction or not. If we would not propagate the alter * statement the types would be different on worker and coordinator. */ - EnsureSequentialModeForTypeDDL(); + EnsureSequentialMode(OBJECT_TYPE); List *commands = list_make3(DISABLE_DDL_PROPAGATION, (void *) alterTypeStmtSql, @@ -266,7 +265,7 @@ PreprocessCreateEnumStmt(Node *node, const char *queryString, * when we allow propagation within a transaction block we should make sure to only * allow this in sequential mode */ - EnsureSequentialModeForTypeDDL(); + EnsureSequentialMode(OBJECT_TYPE); /* to prevent recursion with mx we disable ddl propagation */ List *commands = list_make3(DISABLE_DDL_PROPAGATION, @@ -325,7 +324,7 @@ PreprocessAlterEnumStmt(Node *node, const char *queryString, * (adding values to an enum can not run in a transaction anyway and would error by * postgres already). */ - EnsureSequentialModeForTypeDDL(); + EnsureSequentialMode(OBJECT_TYPE); /* * managing types can only be done on the coordinator if ddl propagation is on. when @@ -405,7 +404,7 @@ PreprocessDropTypeStmt(Node *node, const char *queryString, char *dropStmtSql = DeparseTreeNode((Node *) stmt); stmt->objects = oldTypes; - EnsureSequentialModeForTypeDDL(); + EnsureSequentialMode(OBJECT_TYPE); /* to prevent recursion with mx we disable ddl propagation */ List *commands = list_make3(DISABLE_DDL_PROPAGATION, @@ -442,7 +441,7 @@ PreprocessRenameTypeStmt(Node *node, const char *queryString, /* deparse sql*/ const char *renameStmtSql = DeparseTreeNode(node); - EnsureSequentialModeForTypeDDL(); + EnsureSequentialMode(OBJECT_TYPE); /* to prevent recursion with mx we disable ddl propagation */ List *commands = list_make3(DISABLE_DDL_PROPAGATION, @@ -480,7 +479,7 @@ PreprocessRenameTypeAttributeStmt(Node *node, const char *queryString, const char *sql = DeparseTreeNode((Node *) stmt); - EnsureSequentialModeForTypeDDL(); + EnsureSequentialMode(OBJECT_TYPE); List *commands = list_make3(DISABLE_DDL_PROPAGATION, (void *) sql, ENABLE_DDL_PROPAGATION); @@ -513,7 +512,7 @@ PreprocessAlterTypeSchemaStmt(Node *node, const char *queryString, QualifyTreeNode((Node *) stmt); const char *sql = DeparseTreeNode((Node *) stmt); - EnsureSequentialModeForTypeDDL(); + EnsureSequentialMode(OBJECT_TYPE); List *commands = list_make3(DISABLE_DDL_PROPAGATION, (void *) sql, @@ -572,7 +571,7 @@ PreprocessAlterTypeOwnerStmt(Node *node, const char *queryString, QualifyTreeNode((Node *) stmt); const char *sql = DeparseTreeNode((Node *) stmt); - EnsureSequentialModeForTypeDDL(); + EnsureSequentialMode(OBJECT_TYPE); List *commands = list_make3(DISABLE_DDL_PROPAGATION, (void *) sql, ENABLE_DDL_PROPAGATION); @@ -1116,47 +1115,6 @@ MakeTypeNameFromRangeVar(const RangeVar *relation) } -/* - * EnsureSequentialModeForTypeDDL makes sure that the current transaction is already in - * sequential mode, or can still safely be put in sequential mode, it errors if that is - * not possible. The error contains information for the user to retry the transaction with - * sequential mode set from the beginning. - * - * As types are node scoped objects there exists only 1 instance of the type used by - * potentially multiple shards. To make sure all shards in the transaction can interact - * with the type the type needs to be visible on all connections used by the transaction, - * meaning we can only use 1 connection per node. - */ -static void -EnsureSequentialModeForTypeDDL(void) -{ - if (!IsTransactionBlock()) - { - /* we do not need to switch to sequential mode if we are not in a transaction */ - return; - } - - if (ParallelQueryExecutedInTransaction()) - { - ereport(ERROR, (errmsg("cannot create or modify type because there was a " - "parallel operation on a distributed table in the " - "transaction"), - errdetail("When creating or altering a type, Citus needs to " - "perform all operations over a single connection per " - "node to ensure consistency."), - errhint("Try re-running the transaction with " - "\"SET LOCAL citus.multi_shard_modify_mode TO " - "\'sequential\';\""))); - } - - ereport(DEBUG1, (errmsg("switching to sequential query execution mode"), - errdetail("Type is created or altered. To make sure subsequent " - "commands see the type correctly we need to make sure to " - "use only one connection for all future commands"))); - SetLocalMultiShardModifyModeToSequential(); -} - - /* * ShouldPropagateTypeCreate returns if we should propagate the creation of a type. * diff --git a/src/backend/distributed/executor/multi_executor.c b/src/backend/distributed/executor/multi_executor.c index a101d2968..cc7f8d3ac 100644 --- a/src/backend/distributed/executor/multi_executor.c +++ b/src/backend/distributed/executor/multi_executor.c @@ -32,6 +32,7 @@ #include "distributed/distributed_planner.h" #include "distributed/multi_router_planner.h" #include "distributed/multi_server_executor.h" +#include "distributed/relation_access_tracking.h" #include "distributed/resource_lock.h" #include "distributed/transaction_management.h" #include "distributed/version_compat.h" @@ -81,6 +82,7 @@ int ExecutorLevel = 0; /* local function forward declarations */ static Relation StubRelation(TupleDesc tupleDescriptor); +static char * GetObjectTypeString(ObjectType objType); static bool AlterTableConstraintCheck(QueryDesc *queryDesc); static List * FindCitusCustomScanStates(PlanState *planState); static bool CitusCustomScanStateWalker(PlanState *planState, @@ -691,6 +693,98 @@ SetLocalMultiShardModifyModeToSequential() } +/* + * EnsureSequentialMode makes sure that the current transaction is already in + * sequential mode, or can still safely be put in sequential mode, it errors if that is + * not possible. The error contains information for the user to retry the transaction with + * sequential mode set from the beginning. + * + * Takes an ObjectType to use in the error/debug messages. + */ +void +EnsureSequentialMode(ObjectType objType) +{ + char *objTypeString = GetObjectTypeString(objType); + + if (ParallelQueryExecutedInTransaction()) + { + ereport(ERROR, (errmsg("cannot run %s command because there was a " + "parallel operation on a distributed table in the " + "transaction", objTypeString), + errdetail("When running command on/for a distributed %s, Citus " + "needs to perform all operations over a single " + "connection per node to ensure consistency.", + objTypeString), + errhint("Try re-running the transaction with " + "\"SET LOCAL citus.multi_shard_modify_mode TO " + "\'sequential\';\""))); + } + + ereport(DEBUG1, (errmsg("switching to sequential query execution mode"), + errdetail( + "A command for a distributed %s is run. To make sure subsequent " + "commands see the %s correctly we need to make sure to " + "use only one connection for all future commands", + objTypeString, objTypeString))); + + SetLocalMultiShardModifyModeToSequential(); +} + + +/* + * GetObjectTypeString takes an ObjectType and returns the string version of it. + * We (for now) call this function only in EnsureSequentialMode, and use the returned + * string to generate error/debug messages. + * + * If GetObjectTypeString gets called with an ObjectType that is not in the switch + * statement, the function will return the string "object", and emit a debug message. + * In that case, make sure you've added the newly supported type to the switch statement. + */ +static char * +GetObjectTypeString(ObjectType objType) +{ + switch (objType) + { + case OBJECT_COLLATION: + { + return "collation"; + } + + case OBJECT_DATABASE: + { + return "database"; + } + + case OBJECT_EXTENSION: + { + return "extension"; + } + + case OBJECT_FUNCTION: + { + return "function"; + } + + case OBJECT_SCHEMA: + { + return "schema"; + } + + case OBJECT_TYPE: + { + return "type"; + } + + default: + { + ereport(DEBUG1, (errmsg("unsupported object type"), + errdetail("Please add string conversion for the object."))); + return "object"; + } + } +} + + /* * AlterTableConstraintCheck returns if the given query is an ALTER TABLE * constraint check query. diff --git a/src/include/distributed/multi_executor.h b/src/include/distributed/multi_executor.h index 3648dbc1b..dd10c511d 100644 --- a/src/include/distributed/multi_executor.h +++ b/src/include/distributed/multi_executor.h @@ -139,6 +139,7 @@ extern void ExecuteQueryIntoDestReceiver(Query *query, ParamListInfo params, extern void ExecutePlanIntoDestReceiver(PlannedStmt *queryPlan, ParamListInfo params, DestReceiver *dest); extern void SetLocalMultiShardModifyModeToSequential(void); +extern void EnsureSequentialMode(ObjectType objType); extern void SetLocalForceMaxQueryParallelization(void); extern void SortTupleStore(CitusScanState *scanState); extern bool DistributedPlanModifiesDatabase(DistributedPlan *plan); diff --git a/src/test/regress/expected/alter_database_owner.out b/src/test/regress/expected/alter_database_owner.out index 2e5e54aca..1fb118242 100644 --- a/src/test/regress/expected/alter_database_owner.out +++ b/src/test/regress/expected/alter_database_owner.out @@ -165,8 +165,8 @@ SELECT count(*) FROM t; -- parallel execution; (1 row) ALTER DATABASE regression OWNER TO database_owner_2; -- should ERROR -ERROR: cannot create or modify database because there was a parallel operation on a distributed table in the transaction -DETAIL: When creating or altering a database, Citus needs to perform all operations over a single connection per node to ensure consistency. +ERROR: cannot run database command because there was a parallel operation on a distributed table in the transaction +DETAIL: When running command on/for a distributed database, Citus needs to perform all operations over a single connection per node to ensure consistency. HINT: Try re-running the transaction with "SET LOCAL citus.multi_shard_modify_mode TO 'sequential';" ROLLBACK; -- list the owners of the current database on all nodes diff --git a/src/test/regress/expected/forcedelegation_functions.out b/src/test/regress/expected/forcedelegation_functions.out index abfcaced8..c4e07d5f9 100644 --- a/src/test/regress/expected/forcedelegation_functions.out +++ b/src/test/regress/expected/forcedelegation_functions.out @@ -304,7 +304,7 @@ END; $$ LANGUAGE plpgsql; SELECT create_distributed_function('func_calls_forcepush_func()'); DEBUG: switching to sequential query execution mode -DETAIL: A distributed function is created. To make sure subsequent commands see the type correctly we need to make sure to use only one connection for all future commands +DETAIL: A command for a distributed function is run. To make sure subsequent commands see the function correctly we need to make sure to use only one connection for all future commands create_distributed_function --------------------------------------------------------------------- @@ -312,7 +312,7 @@ DETAIL: A distributed function is created. To make sure subsequent commands see SELECT create_distributed_function('inner_force_delegation_function(int)', '$1', colocate_with := 'test_nested', force_delegation := true); DEBUG: switching to sequential query execution mode -DETAIL: A distributed function is created. To make sure subsequent commands see the type correctly we need to make sure to use only one connection for all future commands +DETAIL: A command for a distributed function is run. To make sure subsequent commands see the function correctly we need to make sure to use only one connection for all future commands create_distributed_function --------------------------------------------------------------------- @@ -475,7 +475,7 @@ END; $$ LANGUAGE plpgsql; SELECT create_distributed_function('test_recursive(int)', '$1', colocate_with := 'test_nested', force_delegation := true); DEBUG: switching to sequential query execution mode -DETAIL: A distributed function is created. To make sure subsequent commands see the type correctly we need to make sure to use only one connection for all future commands +DETAIL: A command for a distributed function is run. To make sure subsequent commands see the function correctly we need to make sure to use only one connection for all future commands create_distributed_function --------------------------------------------------------------------- @@ -550,7 +550,7 @@ SELECT create_distributed_function( colocate_with := 'test_forcepushdown', force_delegation := true); DEBUG: switching to sequential query execution mode -DETAIL: A distributed function is created. To make sure subsequent commands see the type correctly we need to make sure to use only one connection for all future commands +DETAIL: A command for a distributed function is run. To make sure subsequent commands see the function correctly we need to make sure to use only one connection for all future commands create_distributed_function --------------------------------------------------------------------- @@ -620,7 +620,7 @@ END; $$ LANGUAGE plpgsql; SELECT create_distributed_function('inner_emp(text)','empname', force_delegation := true); DEBUG: switching to sequential query execution mode -DETAIL: A distributed function is created. To make sure subsequent commands see the type correctly we need to make sure to use only one connection for all future commands +DETAIL: A command for a distributed function is run. To make sure subsequent commands see the function correctly we need to make sure to use only one connection for all future commands create_distributed_function --------------------------------------------------------------------- @@ -656,7 +656,7 @@ SELECT create_distributed_function( force_delegation := true ); DEBUG: switching to sequential query execution mode -DETAIL: A distributed function is created. To make sure subsequent commands see the type correctly we need to make sure to use only one connection for all future commands +DETAIL: A command for a distributed function is run. To make sure subsequent commands see the function correctly we need to make sure to use only one connection for all future commands create_distributed_function --------------------------------------------------------------------- @@ -731,7 +731,7 @@ SELECT create_distributed_function( force_delegation := true ); DEBUG: switching to sequential query execution mode -DETAIL: A distributed function is created. To make sure subsequent commands see the type correctly we need to make sure to use only one connection for all future commands +DETAIL: A command for a distributed function is run. To make sure subsequent commands see the function correctly we need to make sure to use only one connection for all future commands create_distributed_function --------------------------------------------------------------------- @@ -809,7 +809,7 @@ SELECT create_distributed_function( force_delegation := true ); DEBUG: switching to sequential query execution mode -DETAIL: A distributed function is created. To make sure subsequent commands see the type correctly we need to make sure to use only one connection for all future commands +DETAIL: A command for a distributed function is run. To make sure subsequent commands see the function correctly we need to make sure to use only one connection for all future commands create_distributed_function --------------------------------------------------------------------- @@ -827,7 +827,7 @@ SELECT create_distributed_function( force_delegation := true ); DEBUG: switching to sequential query execution mode -DETAIL: A distributed function is created. To make sure subsequent commands see the type correctly we need to make sure to use only one connection for all future commands +DETAIL: A command for a distributed function is run. To make sure subsequent commands see the function correctly we need to make sure to use only one connection for all future commands create_distributed_function --------------------------------------------------------------------- @@ -845,7 +845,7 @@ SELECT create_distributed_function( force_delegation := true ); DEBUG: switching to sequential query execution mode -DETAIL: A distributed function is created. To make sure subsequent commands see the type correctly we need to make sure to use only one connection for all future commands +DETAIL: A command for a distributed function is run. To make sure subsequent commands see the function correctly we need to make sure to use only one connection for all future commands create_distributed_function --------------------------------------------------------------------- @@ -953,7 +953,7 @@ SELECT create_distributed_function( force_delegation := true ); DEBUG: switching to sequential query execution mode -DETAIL: A distributed function is created. To make sure subsequent commands see the type correctly we need to make sure to use only one connection for all future commands +DETAIL: A command for a distributed function is run. To make sure subsequent commands see the function correctly we need to make sure to use only one connection for all future commands create_distributed_function --------------------------------------------------------------------- @@ -975,7 +975,7 @@ SELECT create_distributed_function( force_delegation := true ); DEBUG: switching to sequential query execution mode -DETAIL: A distributed function is created. To make sure subsequent commands see the type correctly we need to make sure to use only one connection for all future commands +DETAIL: A command for a distributed function is run. To make sure subsequent commands see the function correctly we need to make sure to use only one connection for all future commands create_distributed_function --------------------------------------------------------------------- @@ -996,7 +996,7 @@ SELECT create_distributed_function( force_delegation := true ); DEBUG: switching to sequential query execution mode -DETAIL: A distributed function is created. To make sure subsequent commands see the type correctly we need to make sure to use only one connection for all future commands +DETAIL: A command for a distributed function is run. To make sure subsequent commands see the function correctly we need to make sure to use only one connection for all future commands create_distributed_function --------------------------------------------------------------------- @@ -1017,7 +1017,7 @@ SELECT create_distributed_function( force_delegation := true ); DEBUG: switching to sequential query execution mode -DETAIL: A distributed function is created. To make sure subsequent commands see the type correctly we need to make sure to use only one connection for all future commands +DETAIL: A command for a distributed function is run. To make sure subsequent commands see the function correctly we need to make sure to use only one connection for all future commands create_distributed_function --------------------------------------------------------------------- @@ -1039,7 +1039,7 @@ SELECT create_distributed_function( force_delegation := true ); DEBUG: switching to sequential query execution mode -DETAIL: A distributed function is created. To make sure subsequent commands see the type correctly we need to make sure to use only one connection for all future commands +DETAIL: A command for a distributed function is run. To make sure subsequent commands see the function correctly we need to make sure to use only one connection for all future commands create_distributed_function --------------------------------------------------------------------- @@ -1205,7 +1205,7 @@ END; $$ LANGUAGE plpgsql; SELECT create_distributed_function('test_prepare(int,int)','x',force_delegation :=true, colocate_with := 'table_test_prepare'); DEBUG: switching to sequential query execution mode -DETAIL: A distributed function is created. To make sure subsequent commands see the type correctly we need to make sure to use only one connection for all future commands +DETAIL: A command for a distributed function is run. To make sure subsequent commands see the function correctly we need to make sure to use only one connection for all future commands create_distributed_function --------------------------------------------------------------------- @@ -1400,7 +1400,7 @@ $$ LANGUAGE plpgsql; SELECT create_distributed_function('test(int)', 'x', colocate_with := 'test_perform', force_delegation := true); DEBUG: switching to sequential query execution mode -DETAIL: A distributed function is created. To make sure subsequent commands see the type correctly we need to make sure to use only one connection for all future commands +DETAIL: A command for a distributed function is run. To make sure subsequent commands see the function correctly we need to make sure to use only one connection for all future commands create_distributed_function --------------------------------------------------------------------- diff --git a/src/test/regress/expected/local_dist_join_mixed.out b/src/test/regress/expected/local_dist_join_mixed.out index 0e3e0aad4..5566186b5 100644 --- a/src/test/regress/expected/local_dist_join_mixed.out +++ b/src/test/regress/expected/local_dist_join_mixed.out @@ -1602,6 +1602,8 @@ DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS c (1 row) DROP SCHEMA local_dist_join_mixed CASCADE; +DEBUG: switching to sequential query execution mode +DETAIL: A command for a distributed schema is run. To make sure subsequent commands see the schema correctly we need to make sure to use only one connection for all future commands NOTICE: drop cascades to 7 other objects DETAIL: drop cascades to table distributed drop cascades to table reference diff --git a/src/test/regress/expected/multi_function_in_join.out b/src/test/regress/expected/multi_function_in_join.out index 5d1b2a672..c80b26c57 100644 --- a/src/test/regress/expected/multi_function_in_join.out +++ b/src/test/regress/expected/multi_function_in_join.out @@ -41,7 +41,7 @@ AS 'SELECT $1 + $2;' LANGUAGE SQL; SELECT create_distributed_function('add(integer,integer)'); DEBUG: switching to sequential query execution mode -DETAIL: A distributed function is created. To make sure subsequent commands see the type correctly we need to make sure to use only one connection for all future commands +DETAIL: A command for a distributed function is run. To make sure subsequent commands see the function correctly we need to make sure to use only one connection for all future commands create_distributed_function --------------------------------------------------------------------- diff --git a/src/test/regress/expected/multi_function_in_join_0.out b/src/test/regress/expected/multi_function_in_join_0.out index 21279ab8d..5b818855b 100644 --- a/src/test/regress/expected/multi_function_in_join_0.out +++ b/src/test/regress/expected/multi_function_in_join_0.out @@ -41,7 +41,7 @@ AS 'SELECT $1 + $2;' LANGUAGE SQL; SELECT create_distributed_function('add(integer,integer)'); DEBUG: switching to sequential query execution mode -DETAIL: A distributed function is created. To make sure subsequent commands see the type correctly we need to make sure to use only one connection for all future commands +DETAIL: A command for a distributed function is run. To make sure subsequent commands see the function correctly we need to make sure to use only one connection for all future commands create_distributed_function --------------------------------------------------------------------- diff --git a/src/test/regress/expected/multi_mx_add_coordinator.out b/src/test/regress/expected/multi_mx_add_coordinator.out index 33aec18d8..045563d6f 100644 --- a/src/test/regress/expected/multi_mx_add_coordinator.out +++ b/src/test/regress/expected/multi_mx_add_coordinator.out @@ -141,7 +141,7 @@ END; $$; SELECT create_distributed_function('my_group_id()', colocate_with := 'ref'); DEBUG: switching to sequential query execution mode -DETAIL: A distributed function is created. To make sure subsequent commands see the type correctly we need to make sure to use only one connection for all future commands +DETAIL: A command for a distributed function is run. To make sure subsequent commands see the function correctly we need to make sure to use only one connection for all future commands create_distributed_function --------------------------------------------------------------------- diff --git a/src/test/regress/expected/multi_mx_call.out b/src/test/regress/expected/multi_mx_call.out index 64b033d41..37c989885 100644 --- a/src/test/regress/expected/multi_mx_call.out +++ b/src/test/regress/expected/multi_mx_call.out @@ -416,7 +416,7 @@ CALL multi_mx_call.mx_call_proc_tx(10); -- after distribution ... select create_distributed_function('mx_call_proc_tx(int)', '$1', 'mx_call_dist_table_1'); DEBUG: switching to sequential query execution mode -DETAIL: A distributed function is created. To make sure subsequent commands see the type correctly we need to make sure to use only one connection for all future commands +DETAIL: A command for a distributed function is run. To make sure subsequent commands see the function correctly we need to make sure to use only one connection for all future commands create_distributed_function --------------------------------------------------------------------- @@ -488,7 +488,7 @@ BEGIN END;$$; select create_distributed_function('mx_call_proc_raise(int)', '$1', 'mx_call_dist_table_1'); DEBUG: switching to sequential query execution mode -DETAIL: A distributed function is created. To make sure subsequent commands see the type correctly we need to make sure to use only one connection for all future commands +DETAIL: A command for a distributed function is run. To make sure subsequent commands see the function correctly we need to make sure to use only one connection for all future commands create_distributed_function --------------------------------------------------------------------- @@ -554,7 +554,7 @@ CREATE FUNCTION mx_call_add(int, int) RETURNS int AS 'select $1 + $2;' LANGUAGE SQL IMMUTABLE; SELECT create_distributed_function('mx_call_add(int,int)'); DEBUG: switching to sequential query execution mode -DETAIL: A distributed function is created. To make sure subsequent commands see the type correctly we need to make sure to use only one connection for all future commands +DETAIL: A command for a distributed function is run. To make sure subsequent commands see the function correctly we need to make sure to use only one connection for all future commands create_distributed_function --------------------------------------------------------------------- diff --git a/src/test/regress/expected/multi_mx_call_0.out b/src/test/regress/expected/multi_mx_call_0.out index 496e735c9..6f33d4dae 100644 --- a/src/test/regress/expected/multi_mx_call_0.out +++ b/src/test/regress/expected/multi_mx_call_0.out @@ -416,7 +416,7 @@ CALL multi_mx_call.mx_call_proc_tx(10); -- after distribution ... select create_distributed_function('mx_call_proc_tx(int)', '$1', 'mx_call_dist_table_1'); DEBUG: switching to sequential query execution mode -DETAIL: A distributed function is created. To make sure subsequent commands see the type correctly we need to make sure to use only one connection for all future commands +DETAIL: A command for a distributed function is run. To make sure subsequent commands see the function correctly we need to make sure to use only one connection for all future commands create_distributed_function --------------------------------------------------------------------- @@ -488,7 +488,7 @@ BEGIN END;$$; select create_distributed_function('mx_call_proc_raise(int)', '$1', 'mx_call_dist_table_1'); DEBUG: switching to sequential query execution mode -DETAIL: A distributed function is created. To make sure subsequent commands see the type correctly we need to make sure to use only one connection for all future commands +DETAIL: A command for a distributed function is run. To make sure subsequent commands see the function correctly we need to make sure to use only one connection for all future commands create_distributed_function --------------------------------------------------------------------- @@ -554,7 +554,7 @@ CREATE FUNCTION mx_call_add(int, int) RETURNS int AS 'select $1 + $2;' LANGUAGE SQL IMMUTABLE; SELECT create_distributed_function('mx_call_add(int,int)'); DEBUG: switching to sequential query execution mode -DETAIL: A distributed function is created. To make sure subsequent commands see the type correctly we need to make sure to use only one connection for all future commands +DETAIL: A command for a distributed function is run. To make sure subsequent commands see the function correctly we need to make sure to use only one connection for all future commands create_distributed_function --------------------------------------------------------------------- diff --git a/src/test/regress/expected/multi_mx_function_call_delegation.out b/src/test/regress/expected/multi_mx_function_call_delegation.out index 8192ee35c..954c23420 100644 --- a/src/test/regress/expected/multi_mx_function_call_delegation.out +++ b/src/test/regress/expected/multi_mx_function_call_delegation.out @@ -213,7 +213,7 @@ select colocate_proc_with_table('squares', 'mx_call_dist_table_2'::regclass, 0); select create_distributed_function('mx_call_func_bigint(bigint,bigint)', 'x', colocate_with := 'mx_call_dist_table_bigint'); DEBUG: switching to sequential query execution mode -DETAIL: A distributed function is created. To make sure subsequent commands see the type correctly we need to make sure to use only one connection for all future commands +DETAIL: A command for a distributed function is run. To make sure subsequent commands see the function correctly we need to make sure to use only one connection for all future commands create_distributed_function --------------------------------------------------------------------- @@ -224,7 +224,7 @@ select create_distributed_function('mx_call_func_bigint_force(bigint,bigint)', ' colocate_with := 'mx_call_dist_table_2', force_delegation := true); DEBUG: switching to sequential query execution mode -DETAIL: A distributed function is created. To make sure subsequent commands see the type correctly we need to make sure to use only one connection for all future commands +DETAIL: A command for a distributed function is run. To make sure subsequent commands see the function correctly we need to make sure to use only one connection for all future commands create_distributed_function --------------------------------------------------------------------- @@ -420,7 +420,7 @@ select mx_call_func_tbl(10); -- after distribution ... select create_distributed_function('mx_call_func_tbl(int)', '$1', 'mx_call_dist_table_1'); DEBUG: switching to sequential query execution mode -DETAIL: A distributed function is created. To make sure subsequent commands see the type correctly we need to make sure to use only one connection for all future commands +DETAIL: A command for a distributed function is run. To make sure subsequent commands see the function correctly we need to make sure to use only one connection for all future commands create_distributed_function --------------------------------------------------------------------- @@ -443,7 +443,7 @@ BEGIN END;$$; select create_distributed_function('mx_call_func_raise(int)', '$1', 'mx_call_dist_table_1'); DEBUG: switching to sequential query execution mode -DETAIL: A distributed function is created. To make sure subsequent commands see the type correctly we need to make sure to use only one connection for all future commands +DETAIL: A command for a distributed function is run. To make sure subsequent commands see the function correctly we need to make sure to use only one connection for all future commands create_distributed_function --------------------------------------------------------------------- @@ -595,7 +595,7 @@ CREATE FUNCTION mx_call_add(int, int) RETURNS int AS 'select $1 + $2;' LANGUAGE SQL IMMUTABLE; SELECT create_distributed_function('mx_call_add(int,int)', '$1'); DEBUG: switching to sequential query execution mode -DETAIL: A distributed function is created. To make sure subsequent commands see the type correctly we need to make sure to use only one connection for all future commands +DETAIL: A command for a distributed function is run. To make sure subsequent commands see the function correctly we need to make sure to use only one connection for all future commands create_distributed_function --------------------------------------------------------------------- diff --git a/src/test/regress/expected/multi_mx_function_call_delegation_0.out b/src/test/regress/expected/multi_mx_function_call_delegation_0.out index 9125847b4..66c0f89d2 100644 --- a/src/test/regress/expected/multi_mx_function_call_delegation_0.out +++ b/src/test/regress/expected/multi_mx_function_call_delegation_0.out @@ -213,7 +213,7 @@ select colocate_proc_with_table('squares', 'mx_call_dist_table_2'::regclass, 0); select create_distributed_function('mx_call_func_bigint(bigint,bigint)', 'x', colocate_with := 'mx_call_dist_table_bigint'); DEBUG: switching to sequential query execution mode -DETAIL: A distributed function is created. To make sure subsequent commands see the type correctly we need to make sure to use only one connection for all future commands +DETAIL: A command for a distributed function is run. To make sure subsequent commands see the function correctly we need to make sure to use only one connection for all future commands create_distributed_function --------------------------------------------------------------------- @@ -224,7 +224,7 @@ select create_distributed_function('mx_call_func_bigint_force(bigint,bigint)', ' colocate_with := 'mx_call_dist_table_2', force_delegation := true); DEBUG: switching to sequential query execution mode -DETAIL: A distributed function is created. To make sure subsequent commands see the type correctly we need to make sure to use only one connection for all future commands +DETAIL: A command for a distributed function is run. To make sure subsequent commands see the function correctly we need to make sure to use only one connection for all future commands create_distributed_function --------------------------------------------------------------------- @@ -420,7 +420,7 @@ select mx_call_func_tbl(10); -- after distribution ... select create_distributed_function('mx_call_func_tbl(int)', '$1', 'mx_call_dist_table_1'); DEBUG: switching to sequential query execution mode -DETAIL: A distributed function is created. To make sure subsequent commands see the type correctly we need to make sure to use only one connection for all future commands +DETAIL: A command for a distributed function is run. To make sure subsequent commands see the function correctly we need to make sure to use only one connection for all future commands create_distributed_function --------------------------------------------------------------------- @@ -443,7 +443,7 @@ BEGIN END;$$; select create_distributed_function('mx_call_func_raise(int)', '$1', 'mx_call_dist_table_1'); DEBUG: switching to sequential query execution mode -DETAIL: A distributed function is created. To make sure subsequent commands see the type correctly we need to make sure to use only one connection for all future commands +DETAIL: A command for a distributed function is run. To make sure subsequent commands see the function correctly we need to make sure to use only one connection for all future commands create_distributed_function --------------------------------------------------------------------- @@ -595,7 +595,7 @@ CREATE FUNCTION mx_call_add(int, int) RETURNS int AS 'select $1 + $2;' LANGUAGE SQL IMMUTABLE; SELECT create_distributed_function('mx_call_add(int,int)', '$1'); DEBUG: switching to sequential query execution mode -DETAIL: A distributed function is created. To make sure subsequent commands see the type correctly we need to make sure to use only one connection for all future commands +DETAIL: A command for a distributed function is run. To make sure subsequent commands see the function correctly we need to make sure to use only one connection for all future commands create_distributed_function --------------------------------------------------------------------- diff --git a/src/test/regress/expected/multi_schema_support.out b/src/test/regress/expected/multi_schema_support.out index ee4879edf..6e3ac0dc8 100644 --- a/src/test/regress/expected/multi_schema_support.out +++ b/src/test/regress/expected/multi_schema_support.out @@ -1428,8 +1428,8 @@ BEGIN; (1 row) ALTER SCHEMA bar RENAME TO foo; -ERROR: cannot create or modify schema because there was a parallel operation on a distributed table in the transaction -DETAIL: When creating, altering, or dropping a schema, Citus needs to perform all operations over a single connection per node to ensure consistency. +ERROR: cannot run schema command because there was a parallel operation on a distributed table in the transaction +DETAIL: When running command on/for a distributed schema, Citus needs to perform all operations over a single connection per node to ensure consistency. HINT: Try re-running the transaction with "SET LOCAL citus.multi_shard_modify_mode TO 'sequential';" ROLLBACK; BEGIN; diff --git a/src/test/regress/expected/non_colocated_subquery_joins.out b/src/test/regress/expected/non_colocated_subquery_joins.out index 4fa460f4a..0776f47cf 100644 --- a/src/test/regress/expected/non_colocated_subquery_joins.out +++ b/src/test/regress/expected/non_colocated_subquery_joins.out @@ -8,6 +8,8 @@ -- =================================================================== SET client_min_messages TO DEBUG1; CREATE SCHEMA non_colocated_subquery; +DEBUG: switching to sequential query execution mode +DETAIL: A command for a distributed schema is run. To make sure subsequent commands see the schema correctly we need to make sure to use only one connection for all future commands SET search_path TO non_colocated_subquery, public; -- we don't use the data anyway CREATE TABLE users_table_local AS SELECT * FROM users_table LIMIT 0; From abd5b1c5068690e8be9207942ea2d32a89f381e7 Mon Sep 17 00:00:00 2001 From: Onder Kalaci Date: Mon, 14 Feb 2022 17:34:32 +0100 Subject: [PATCH 7/7] Prevent any monitoring view/udf to show already exited backends MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The low-level StoreAllActiveTransactions() function filters out backends that exited. Before this commit, if you run a pgbench, after that you'd still see the backends show up: ```SQL select count(*) from get_global_active_transactions(); ┌───────┐ │ count │ ├───────┤ │ 538 │ └───────┘ ``` After this patch, only active backends show-up: ```SQL select count(*) from get_global_active_transactions(); ┌───────┐ │ count │ ├───────┤ │ 72 │ └───────┘ ``` --- src/backend/distributed/shared_library_init.c | 3 +++ src/backend/distributed/transaction/backend_data.c | 3 +-- src/include/distributed/backend_data.h | 1 + 3 files changed, 5 insertions(+), 2 deletions(-) diff --git a/src/backend/distributed/shared_library_init.c b/src/backend/distributed/shared_library_init.c index 5dbc7d3e1..521d3e406 100644 --- a/src/backend/distributed/shared_library_init.c +++ b/src/backend/distributed/shared_library_init.c @@ -513,6 +513,9 @@ CitusCleanupConnectionsAtExit(int code, Datum arg) * are already given away. */ DeallocateReservedConnections(); + + /* we don't want any monitoring view/udf to show already exited backends */ + UnSetGlobalPID(); } diff --git a/src/backend/distributed/transaction/backend_data.c b/src/backend/distributed/transaction/backend_data.c index 4e037e30c..bc54e1da9 100644 --- a/src/backend/distributed/transaction/backend_data.c +++ b/src/backend/distributed/transaction/backend_data.c @@ -90,7 +90,6 @@ static BackendData *MyBackendData = NULL; static void BackendManagementShmemInit(void); static size_t BackendManagementShmemSize(void); -static void UnSetGlobalPID(void); PG_FUNCTION_INFO_V1(assign_distributed_transaction_id); @@ -674,7 +673,7 @@ UnSetDistributedTransactionId(void) /* * UnSetGlobalPID resets the global pid for the current backend. */ -static void +void UnSetGlobalPID(void) { /* backend does not exist if the extension is not created */ diff --git a/src/include/distributed/backend_data.h b/src/include/distributed/backend_data.h index bf9a57fd1..7f3a81e88 100644 --- a/src/include/distributed/backend_data.h +++ b/src/include/distributed/backend_data.h @@ -62,6 +62,7 @@ extern void InitializeBackendData(void); extern void LockBackendSharedMemory(LWLockMode lockMode); extern void UnlockBackendSharedMemory(void); extern void UnSetDistributedTransactionId(void); +extern void UnSetGlobalPID(void); extern void AssignDistributedTransactionId(void); extern void MarkCitusInitiatedCoordinatorBackend(void); extern void AssignGlobalPID(void);