From 7cb1d6ae0604c16f62cbe6f98e81bb745bc93843 Mon Sep 17 00:00:00 2001 From: Onder Kalaci Date: Mon, 27 Dec 2021 12:29:32 +0100 Subject: [PATCH 1/6] Improve metadata connections With https://github.com/citusdata/citus/pull/5493 we introduced metadata specific connections. With this connection we guarantee that there is a single metadata connection. But note that this connection can be used for any other operation. In other words, this connection is not only reserved for metadata operations. However, as https://github.com/citusdata/citus-enterprise/issues/715 showed us that the logic has a flaw. We allowed ineligible connections to be picked as metadata connections: such as exclusively claimed connections or not fully initialized connections. With this commit, we make sure that we only consider eligable connections for metadata operations. --- .../connection/connection_management.c | 61 +++++++------------ 1 file changed, 22 insertions(+), 39 deletions(-) diff --git a/src/backend/distributed/connection/connection_management.c b/src/backend/distributed/connection/connection_management.c index dc53cdda5..ce30e2b0f 100644 --- a/src/backend/distributed/connection/connection_management.c +++ b/src/backend/distributed/connection/connection_management.c @@ -56,9 +56,7 @@ static int ConnectionHashCompare(const void *a, const void *b, Size keysize); static void StartConnectionEstablishment(MultiConnection *connectionn, ConnectionHashKey *key); static MultiConnection * FindAvailableConnection(dlist_head *connections, uint32 flags); -#ifdef USE_ASSERT_CHECKING -static void AssertSingleMetadataConnectionExists(dlist_head *connections); -#endif +static void ErrorIfMultipleMetadataConnectionExists(dlist_head *connections); static void FreeConnParamsHashEntryFields(ConnParamsHashEntry *entry); static void AfterXactHostConnectionHandling(ConnectionHashEntry *entry, bool isCommit); static bool ShouldShutdownConnection(MultiConnection *connection, const int @@ -420,6 +418,8 @@ StartNodeUserDatabaseConnection(uint32 flags, const char *hostname, int32 port, static MultiConnection * FindAvailableConnection(dlist_head *connections, uint32 flags) { + List *metadataConnectionCandidateList = NIL; + dlist_iter iter; dlist_foreach(iter, connections) { @@ -473,52 +473,40 @@ FindAvailableConnection(dlist_head *connections, uint32 flags) { /* * The caller requested a metadata connection, and this is not the + * metadata connection. Still, this is a candidate for becoming a * metadata connection. */ + metadataConnectionCandidateList = + lappend(metadataConnectionCandidateList, connection); continue; } - else - { - /* - * Now that we found metadata connection. We do some sanity - * checks. - */ - #ifdef USE_ASSERT_CHECKING - AssertSingleMetadataConnectionExists(connections); - #endif - - /* - * Connection is in use for an ongoing operation. Metadata - * connection cannot be claimed exclusively. - */ - if (connection->claimedExclusively) - { - ereport(ERROR, (errmsg("metadata connections cannot be " - "claimed exclusively"))); - } - } return connection; } - if ((flags & REQUIRE_METADATA_CONNECTION) && !dlist_is_empty(connections)) + if ((flags & REQUIRE_METADATA_CONNECTION) && + list_length(metadataConnectionCandidateList) > 0) { /* - * Caller asked a metadata connection, and we couldn't find in the - * above list. So, we pick the first connection as the metadata - * connection. + * Caller asked a metadata connection, and we couldn't find a connection + * that has already been used for metadata operations. + * + * So, we pick the first connection as the metadata connection. */ MultiConnection *metadataConnection = - dlist_container(MultiConnection, connectionNode, - dlist_head_node(connections)); + linitial(metadataConnectionCandidateList); + Assert(!metadataConnection->claimedExclusively); /* remember that we use this connection for metadata operations */ metadataConnection->useForMetadataOperations = true; - #ifdef USE_ASSERT_CHECKING - AssertSingleMetadataConnectionExists(connections); - #endif + /* + * We cannot have multiple metadata connections. If we see + * this error, it is likely that there is a bug in connection + * management. + */ + ErrorIfMultipleMetadataConnectionExists(connections); return metadataConnection; } @@ -527,14 +515,12 @@ FindAvailableConnection(dlist_head *connections, uint32 flags) } -#ifdef USE_ASSERT_CHECKING - /* - * AssertSingleMetadataConnectionExists throws an error if the + * ErrorIfMultipleMetadataConnectionExists throws an error if the * input connection dlist contains more than one metadata connections. */ static void -AssertSingleMetadataConnectionExists(dlist_head *connections) +ErrorIfMultipleMetadataConnectionExists(dlist_head *connections) { bool foundMetadataConnection = false; dlist_iter iter; @@ -556,9 +542,6 @@ AssertSingleMetadataConnectionExists(dlist_head *connections) } -#endif /* USE_ASSERT_CHECKING */ - - /* * CloseAllConnectionsAfterTransaction sets the forceClose flag of all the * connections. This is mainly done when citus.node_conninfo changes. From 3c834e66936da6b0d76a8e024f21b61783b7ed0d Mon Sep 17 00:00:00 2001 From: Ahmet Gedemenli Date: Fri, 7 Jan 2022 18:12:23 +0300 Subject: [PATCH 2/6] Disable foreign distributed tables (#5605) * Disable foreign distributed tables * Add warning for existing distributed foreign tables --- .../commands/create_distributed_table.c | 21 +++ src/backend/distributed/commands/table.c | 2 +- .../distributed/planner/distributed_planner.c | 63 ++++++++- .../regress/expected/foreign_tables_mx.out | 31 +---- .../regress/expected/mixed_relkind_tests.out | 68 --------- .../expected/multi_colocation_utils.out | 131 +++++++----------- .../regress/expected/multi_create_shards.out | 46 ------ .../expected/multi_generate_ddl_commands.out | 50 ------- .../regress/expected/multi_repair_shards.out | 27 ---- .../regress/expected/undistribute_table.out | 39 ------ src/test/regress/sql/foreign_tables_mx.sql | 4 +- src/test/regress/sql/mixed_relkind_tests.sql | 27 ---- .../regress/sql/multi_colocation_utils.sql | 9 +- src/test/regress/sql/multi_create_shards.sql | 27 ---- .../sql/multi_generate_ddl_commands.sql | 26 ---- src/test/regress/sql/multi_repair_shards.sql | 26 ---- src/test/regress/sql/undistribute_table.sql | 14 -- 17 files changed, 138 insertions(+), 473 deletions(-) diff --git a/src/backend/distributed/commands/create_distributed_table.c b/src/backend/distributed/commands/create_distributed_table.c index 9a200a059..041415686 100644 --- a/src/backend/distributed/commands/create_distributed_table.c +++ b/src/backend/distributed/commands/create_distributed_table.c @@ -129,6 +129,7 @@ static void DoCopyFromLocalTableIntoShards(Relation distributedRelation, TupleTableSlot *slot, EState *estate); static void ErrorIfTemporaryTable(Oid relationId); +static void ErrorIfForeignTable(Oid relationOid); /* exports for SQL callable functions */ PG_FUNCTION_INFO_V1(master_create_distributed_table); @@ -333,6 +334,7 @@ EnsureCitusTableCanBeCreated(Oid relationOid) EnsureRelationExists(relationOid); EnsureTableOwner(relationOid); ErrorIfTemporaryTable(relationOid); + ErrorIfForeignTable(relationOid); /* * We should do this check here since the codes in the following lines rely @@ -1880,3 +1882,22 @@ DistributionColumnUsesGeneratedStoredColumn(TupleDesc relationDesc, return false; } + + +/* + * ErrorIfForeignTable errors out if the relation with given relationOid + * is a foreign table. + */ +static void +ErrorIfForeignTable(Oid relationOid) +{ + if (IsForeignTable(relationOid)) + { + char *relname = get_rel_name(relationOid); + char *qualifiedRelname = generate_qualified_relation_name(relationOid); + ereport(ERROR, (errmsg("foreign tables cannot be distributed"), + (errhint("Can add foreign table \"%s\" to metadata by running: " + "SELECT citus_add_local_table_to_metadata($$%s$$);", + relname, qualifiedRelname)))); + } +} diff --git a/src/backend/distributed/commands/table.c b/src/backend/distributed/commands/table.c index 83ae78a91..26338570c 100644 --- a/src/backend/distributed/commands/table.c +++ b/src/backend/distributed/commands/table.c @@ -54,7 +54,6 @@ /* controlled via GUC, should be accessed via GetEnableLocalReferenceForeignKeys() */ bool EnableLocalReferenceForeignKeys = true; - /* Local functions forward declarations for unsupported command checks */ static void PostprocessCreateTableStmtForeignKeys(CreateStmt *createStatement); static void PostprocessCreateTableStmtPartitionOf(CreateStmt *createStatement, @@ -1786,6 +1785,7 @@ PreprocessAlterTableSchemaStmt(Node *node, const char *queryString, { return NIL; } + DDLJob *ddlJob = palloc0(sizeof(DDLJob)); QualifyTreeNode((Node *) stmt); ddlJob->targetRelationId = relationId; diff --git a/src/backend/distributed/planner/distributed_planner.c b/src/backend/distributed/planner/distributed_planner.c index 639b22f47..b318a3f3c 100644 --- a/src/backend/distributed/planner/distributed_planner.c +++ b/src/backend/distributed/planner/distributed_planner.c @@ -23,6 +23,7 @@ #include "distributed/citus_nodefuncs.h" #include "distributed/citus_nodes.h" #include "distributed/citus_ruleutils.h" +#include "distributed/commands.h" #include "distributed/cte_inline.h" #include "distributed/function_call_delegation.h" #include "distributed/insert_select_planner.h" @@ -71,7 +72,8 @@ static uint64 NextPlanId = 1; /* keep track of planner call stack levels */ int PlannerLevel = 0; -static bool ListContainsDistributedTableRTE(List *rangeTableList); +static bool ListContainsDistributedTableRTE(List *rangeTableList, + bool *maybeHasForeignDistributedTable); static bool IsUpdateOrDelete(Query *query); static PlannedStmt * CreateDistributedPlannedStmt( DistributedPlanningContext *planContext); @@ -123,6 +125,7 @@ static PlannedStmt * PlanDistributedStmt(DistributedPlanningContext *planContext int rteIdCounter); static RTEListProperties * GetRTEListProperties(List *rangeTableList); static List * TranslatedVars(PlannerInfo *root, int relationIndex); +static void WarnIfListHasForeignDistributedTable(List *rangeTableList); /* Distributed planner hook */ @@ -149,10 +152,18 @@ distributed_planner(Query *parse, } else if (CitusHasBeenLoaded()) { - needsDistributedPlanning = ListContainsDistributedTableRTE(rangeTableList); + bool maybeHasForeignDistributedTable = false; + needsDistributedPlanning = + ListContainsDistributedTableRTE(rangeTableList, + &maybeHasForeignDistributedTable); if (needsDistributedPlanning) { fastPathRouterQuery = FastPathRouterQuery(parse, &distributionKeyValue); + + if (maybeHasForeignDistributedTable) + { + WarnIfListHasForeignDistributedTable(rangeTableList); + } } } @@ -311,17 +322,19 @@ NeedsDistributedPlanning(Query *query) List *allRTEs = ExtractRangeTableEntryList(query); - return ListContainsDistributedTableRTE(allRTEs); + return ListContainsDistributedTableRTE(allRTEs, NULL); } /* * ListContainsDistributedTableRTE gets a list of range table entries * and returns true if there is at least one distributed relation range - * table entry in the list. + * table entry in the list. The boolean maybeHasForeignDistributedTable + * variable is set to true if the list contains a foreign table. */ static bool -ListContainsDistributedTableRTE(List *rangeTableList) +ListContainsDistributedTableRTE(List *rangeTableList, + bool *maybeHasForeignDistributedTable) { ListCell *rangeTableCell = NULL; @@ -336,6 +349,12 @@ ListContainsDistributedTableRTE(List *rangeTableList) if (IsCitusTable(rangeTableEntry->relid)) { + if (maybeHasForeignDistributedTable != NULL && + IsForeignTable(rangeTableEntry->relid)) + { + *maybeHasForeignDistributedTable = true; + } + return true; } } @@ -2408,3 +2427,37 @@ GetRTEListProperties(List *rangeTableList) return rteListProperties; } + + +/* + * WarnIfListHasForeignDistributedTable iterates the given list and logs a WARNING + * if the given relation is a distributed foreign table. + * We do that because now we only support Citus Local Tables for foreign tables. + */ +static void +WarnIfListHasForeignDistributedTable(List *rangeTableList) +{ + static bool DistributedForeignTableWarningPrompted = false; + + RangeTblEntry *rangeTableEntry = NULL; + foreach_ptr(rangeTableEntry, rangeTableList) + { + if (DistributedForeignTableWarningPrompted) + { + return; + } + + Oid relationId = rangeTableEntry->relid; + if (IsForeignTable(relationId) && IsCitusTable(relationId) && + !IsCitusTableType(relationId, CITUS_LOCAL_TABLE)) + { + DistributedForeignTableWarningPrompted = true; + ereport(WARNING, (errmsg( + "support for distributed foreign tables are deprecated, " + "please use Citus managed local tables"), + (errdetail( + "Foreign tables can be added to metadata using UDF: " + "citus_add_local_table_to_metadata()")))); + } + } +} diff --git a/src/test/regress/expected/foreign_tables_mx.out b/src/test/regress/expected/foreign_tables_mx.out index 21c7d6c69..a3de72259 100644 --- a/src/test/regress/expected/foreign_tables_mx.out +++ b/src/test/regress/expected/foreign_tables_mx.out @@ -199,36 +199,11 @@ NOTICE: renaming the new table to foreign_tables_schema_mx.foreign_table (1 row) +-- both should error out SELECT create_distributed_table('foreign_table','data'); - create_distributed_table ---------------------------------------------------------------------- - -(1 row) - -SELECT undistribute_table('foreign_table'); -NOTICE: creating a new table for foreign_tables_schema_mx.foreign_table -NOTICE: dropping the old foreign_tables_schema_mx.foreign_table -NOTICE: renaming the new table to foreign_tables_schema_mx.foreign_table - undistribute_table ---------------------------------------------------------------------- - -(1 row) - +ERROR: foreign tables cannot be distributed SELECT create_reference_table('foreign_table'); - create_reference_table ---------------------------------------------------------------------- - -(1 row) - -SELECT undistribute_table('foreign_table'); -NOTICE: creating a new table for foreign_tables_schema_mx.foreign_table -NOTICE: dropping the old foreign_tables_schema_mx.foreign_table -NOTICE: renaming the new table to foreign_tables_schema_mx.foreign_table - undistribute_table ---------------------------------------------------------------------- - -(1 row) - +ERROR: foreign tables cannot be distributed INSERT INTO foreign_table_test VALUES (1, 'testt'); SELECT * FROM foreign_table ORDER BY a; data | a diff --git a/src/test/regress/expected/mixed_relkind_tests.out b/src/test/regress/expected/mixed_relkind_tests.out index cb8e50499..20cb6ebac 100644 --- a/src/test/regress/expected/mixed_relkind_tests.out +++ b/src/test/regress/expected/mixed_relkind_tests.out @@ -62,13 +62,6 @@ SELECT create_distributed_table('partitioned_distributed_table', 'a'); CREATE VIEW view_on_part_dist AS SELECT * FROM partitioned_distributed_table; CREATE MATERIALIZED VIEW mat_view_on_part_dist AS SELECT * FROM partitioned_distributed_table; -CREATE FOREIGN TABLE foreign_distributed_table (a int, b int) SERVER fake_fdw_server; -SELECT create_distributed_table('foreign_distributed_table', 'a'); - create_distributed_table ---------------------------------------------------------------------- - -(1 row) - -- and insert some data INSERT INTO postgres_local_table SELECT * FROM generate_series(0, 5); INSERT INTO partitioned_postgres_local_table SELECT * FROM generate_series(0, 5); @@ -145,12 +138,6 @@ SELECT * FROM unlogged_distributed_table UNION SELECT 1,1 ORDER BY 1,2; 5 | 6 (7 rows) -SELECT * from foreign_distributed_table UNION SELECT 1,1 ORDER BY 1,2; - a | b ---------------------------------------------------------------------- - 1 | 1 -(1 row) - SELECT 1 UNION SELECT * FROM citus_local_table ORDER BY 1; ?column? --------------------------------------------------------------------- @@ -378,17 +365,6 @@ DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS c 455 (1 row) -SELECT COUNT(*) FROM - (SELECT *, random() FROM unlogged_distributed_table) AS foo, - (SELECT *, random() FROM foreign_distributed_table) AS bar -WHERE foo.a = bar.b; -DEBUG: generating subplan XXX_1 for subquery SELECT a, b, random() AS random FROM mixed_relkind_tests.foreign_distributed_table -DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (SELECT unlogged_distributed_table.a, unlogged_distributed_table.b, random() AS random FROM mixed_relkind_tests.unlogged_distributed_table) foo, (SELECT intermediate_result.a, intermediate_result.b, intermediate_result.random FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer, random double precision)) bar WHERE (foo.a OPERATOR(pg_catalog.=) bar.b) - count ---------------------------------------------------------------------- - 0 -(1 row) - UPDATE partitioned_distributed_table SET b = foo.a FROM citus_local_table AS foo; DEBUG: Wrapping relation "citus_local_table" "foo" to a subquery DEBUG: generating subplan XXX_1 for subquery SELECT a FROM mixed_relkind_tests.citus_local_table foo WHERE true @@ -486,15 +462,6 @@ DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS c 1014 (1 row) -WITH cte_1 AS MATERIALIZED (SELECT * FROM foreign_distributed_table) - SELECT COUNT(*) FROM cte_1 JOIN foreign_distributed_table USING (a); -DEBUG: generating subplan XXX_1 for CTE cte_1: SELECT a, b FROM mixed_relkind_tests.foreign_distributed_table -DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM ((SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) cte_1 JOIN mixed_relkind_tests.foreign_distributed_table USING (a)) - count ---------------------------------------------------------------------- - 0 -(1 row) - WITH cte_1 AS MATERIALIZED (SELECT * FROM partitioned_distributed_table) SELECT COUNT(*) FROM cte_1 JOIN partitioned_distributed_table USING (b); DEBUG: generating subplan XXX_1 for CTE cte_1: SELECT a, b FROM mixed_relkind_tests.partitioned_distributed_table @@ -658,18 +625,6 @@ $Q$); Task Count: 4 (4 rows) -SELECT public.coordinator_plan($Q$ -EXPLAIN (COSTS OFF) -SELECT a, COUNT(*) OVER (PARTITION BY a) FROM foreign_distributed_table ORDER BY 1,2; -$Q$); - coordinator_plan ---------------------------------------------------------------------- - Sort - Sort Key: remote_scan.a, remote_scan.count - -> Custom Scan (Citus Adaptive) - Task Count: 4 -(4 rows) - -- pull to coordinator WINDOW SELECT public.coordinator_plan($Q$ EXPLAIN (COSTS OFF) @@ -686,21 +641,6 @@ $Q$); Task Count: 4 (7 rows) -SELECT public.coordinator_plan($Q$ -EXPLAIN (COSTS OFF) -SELECT a, COUNT(*) OVER (PARTITION BY a+1) FROM foreign_distributed_table ORDER BY 1,2; -$Q$); - coordinator_plan ---------------------------------------------------------------------- - Sort - Sort Key: remote_scan.a, (count(*) OVER (?)) - -> WindowAgg - -> Sort - Sort Key: remote_scan.worker_column_2 - -> Custom Scan (Citus Adaptive) - Task Count: 4 -(7 rows) - -- FOR UPDATE SELECT * FROM partitioned_distributed_table WHERE a = 1 ORDER BY 1,2 FOR UPDATE; a | b @@ -737,14 +677,6 @@ BEGIN; --------------------------------------------------------------------- (0 rows) -COMMIT; -BEGIN; - ALTER TABLE foreign_distributed_table DROP COLUMN b CASCADE; - SELECT * FROM foreign_distributed_table; - a ---------------------------------------------------------------------- -(0 rows) - COMMIT; -- cleanup at exit DROP SCHEMA mixed_relkind_tests CASCADE; diff --git a/src/test/regress/expected/multi_colocation_utils.out b/src/test/regress/expected/multi_colocation_utils.out index b4da5f34f..dba77faa8 100644 --- a/src/test/regress/expected/multi_colocation_utils.out +++ b/src/test/regress/expected/multi_colocation_utils.out @@ -425,14 +425,6 @@ SELECT create_distributed_table('table_range', 'id', 'range'); (1 row) --- test foreign table creation -CREATE FOREIGN TABLE table3_groupD ( id int ) SERVER fake_fdw_server; -SELECT create_distributed_table('table3_groupD', 'id'); - create_distributed_table ---------------------------------------------------------------------- - -(1 row) - -- check metadata SELECT * FROM pg_dist_colocation WHERE colocationid >= 1 AND colocationid < 1000 @@ -458,8 +450,7 @@ SELECT logicalrelid, colocationid FROM pg_dist_partition table2_groupc | 6 table1_groupd | 7 table2_groupd | 7 - table3_groupd | 7 -(9 rows) +(8 rows) -- check effects of dropping tables DROP TABLE table1_groupA; @@ -585,13 +576,12 @@ SELECT logicalrelid, colocationid FROM pg_dist_partition table2_groupc | 6 table1_groupd | 7 table2_groupd | 7 - table3_groupd | 7 table1_group_none_1 | 8 table2_group_none_1 | 8 table1_group_none_2 | 9 table1_group_none_3 | 10 table1_group_default | 11 -(17 rows) +(16 rows) -- check failing colocate_with options CREATE TABLE table_postgresql( id int ); @@ -621,14 +611,14 @@ ERROR: cannot colocate tables table1_groupe and table_bigint DETAIL: Distribution column types don't match for table1_groupe and table_bigint. -- check worker table schemas \c - - - :worker_1_port -SELECT "Column", "Type", "Modifiers" FROM table_desc WHERE relid='public.table3_groupE_1300062'::regclass; +SELECT "Column", "Type", "Modifiers" FROM table_desc WHERE relid='public.table3_groupE_1300054'::regclass; Column | Type | Modifiers --------------------------------------------------------------------- dummy_column | text | id | integer | (2 rows) -SELECT "Column", "Type", "Modifiers" FROM table_desc WHERE relid='schema_colocation.table4_groupE_1300064'::regclass; +SELECT "Column", "Type", "Modifiers" FROM table_desc WHERE relid='schema_colocation.table4_groupE_1300056'::regclass; Column | Type | Modifiers --------------------------------------------------------------------- id | integer | @@ -685,8 +675,6 @@ ORDER BY table1_groupb | table2_groupb | t table1_groupc | table2_groupc | t table1_groupd | table2_groupd | t - table1_groupd | table3_groupd | t - table2_groupd | table3_groupd | t table1_groupe | table2_groupe | t table1_groupe | table3_groupe | t table1_groupe | schema_colocation.table4_groupe | t @@ -699,7 +687,7 @@ ORDER BY schema_colocation.table4_groupe | table4_groupe | t table1_group_none_1 | table2_group_none_1 | t table1_groupf | table2_groupf | t -(18 rows) +(16 rows) -- check created shards SELECT @@ -766,71 +754,55 @@ ORDER BY table2_groupd | 1300048 | t | 57638 | 1073741824 | 1610612735 table2_groupd | 1300049 | t | 57637 | 1610612736 | 2147483647 table2_groupd | 1300049 | t | 57638 | 1610612736 | 2147483647 - table3_groupd | 1300050 | f | 57637 | -2147483648 | -1610612737 - table3_groupd | 1300050 | f | 57638 | -2147483648 | -1610612737 - table3_groupd | 1300051 | f | 57637 | -1610612736 | -1073741825 - table3_groupd | 1300051 | f | 57638 | -1610612736 | -1073741825 - table3_groupd | 1300052 | f | 57637 | -1073741824 | -536870913 - table3_groupd | 1300052 | f | 57638 | -1073741824 | -536870913 - table3_groupd | 1300053 | f | 57637 | -536870912 | -1 - table3_groupd | 1300053 | f | 57638 | -536870912 | -1 - table3_groupd | 1300054 | f | 57637 | 0 | 536870911 - table3_groupd | 1300054 | f | 57638 | 0 | 536870911 - table3_groupd | 1300055 | f | 57637 | 536870912 | 1073741823 - table3_groupd | 1300055 | f | 57638 | 536870912 | 1073741823 - table3_groupd | 1300056 | f | 57637 | 1073741824 | 1610612735 - table3_groupd | 1300056 | f | 57638 | 1073741824 | 1610612735 - table3_groupd | 1300057 | f | 57637 | 1610612736 | 2147483647 - table3_groupd | 1300057 | f | 57638 | 1610612736 | 2147483647 - table1_groupe | 1300058 | t | 57637 | -2147483648 | -1 - table1_groupe | 1300058 | t | 57638 | -2147483648 | -1 - table1_groupe | 1300059 | t | 57637 | 0 | 2147483647 - table1_groupe | 1300059 | t | 57638 | 0 | 2147483647 - table2_groupe | 1300060 | t | 57637 | -2147483648 | -1 - table2_groupe | 1300060 | t | 57638 | -2147483648 | -1 - table2_groupe | 1300061 | t | 57637 | 0 | 2147483647 - table2_groupe | 1300061 | t | 57638 | 0 | 2147483647 - table3_groupe | 1300062 | t | 57637 | -2147483648 | -1 - table3_groupe | 1300062 | t | 57638 | -2147483648 | -1 - table3_groupe | 1300063 | t | 57637 | 0 | 2147483647 - table3_groupe | 1300063 | t | 57638 | 0 | 2147483647 - schema_colocation.table4_groupe | 1300064 | t | 57637 | -2147483648 | -1 - schema_colocation.table4_groupe | 1300064 | t | 57638 | -2147483648 | -1 - schema_colocation.table4_groupe | 1300065 | t | 57637 | 0 | 2147483647 - schema_colocation.table4_groupe | 1300065 | t | 57638 | 0 | 2147483647 - table1_group_none_1 | 1300066 | t | 57637 | -2147483648 | -1 - table1_group_none_1 | 1300066 | t | 57638 | -2147483648 | -1 - table1_group_none_1 | 1300067 | t | 57637 | 0 | 2147483647 - table1_group_none_1 | 1300067 | t | 57638 | 0 | 2147483647 - table2_group_none_1 | 1300068 | t | 57637 | -2147483648 | -1 - table2_group_none_1 | 1300068 | t | 57638 | -2147483648 | -1 - table2_group_none_1 | 1300069 | t | 57637 | 0 | 2147483647 - table2_group_none_1 | 1300069 | t | 57638 | 0 | 2147483647 - table1_group_none_2 | 1300070 | t | 57637 | -2147483648 | -1 - table1_group_none_2 | 1300070 | t | 57638 | -2147483648 | -1 - table1_group_none_2 | 1300071 | t | 57637 | 0 | 2147483647 - table1_group_none_2 | 1300071 | t | 57638 | 0 | 2147483647 - table4_groupe | 1300072 | t | 57637 | -2147483648 | -1 - table4_groupe | 1300072 | t | 57638 | -2147483648 | -1 - table4_groupe | 1300073 | t | 57637 | 0 | 2147483647 - table4_groupe | 1300073 | t | 57638 | 0 | 2147483647 - table1_group_none_3 | 1300074 | t | 57637 | -2147483648 | -715827884 - table1_group_none_3 | 1300074 | t | 57638 | -2147483648 | -715827884 - table1_group_none_3 | 1300075 | t | 57637 | -715827883 | 715827881 - table1_group_none_3 | 1300075 | t | 57638 | -715827883 | 715827881 - table1_group_none_3 | 1300076 | t | 57637 | 715827882 | 2147483647 - table1_group_none_3 | 1300076 | t | 57638 | 715827882 | 2147483647 - table1_group_default | 1300077 | t | 57637 | -2147483648 | -715827884 - table1_group_default | 1300077 | t | 57638 | -2147483648 | -715827884 - table1_group_default | 1300078 | t | 57637 | -715827883 | 715827881 - table1_group_default | 1300078 | t | 57638 | -715827883 | 715827881 - table1_group_default | 1300079 | t | 57637 | 715827882 | 2147483647 - table1_group_default | 1300079 | t | 57638 | 715827882 | 2147483647 + table1_groupe | 1300050 | t | 57637 | -2147483648 | -1 + table1_groupe | 1300050 | t | 57638 | -2147483648 | -1 + table1_groupe | 1300051 | t | 57637 | 0 | 2147483647 + table1_groupe | 1300051 | t | 57638 | 0 | 2147483647 + table2_groupe | 1300052 | t | 57637 | -2147483648 | -1 + table2_groupe | 1300052 | t | 57638 | -2147483648 | -1 + table2_groupe | 1300053 | t | 57637 | 0 | 2147483647 + table2_groupe | 1300053 | t | 57638 | 0 | 2147483647 + table3_groupe | 1300054 | t | 57637 | -2147483648 | -1 + table3_groupe | 1300054 | t | 57638 | -2147483648 | -1 + table3_groupe | 1300055 | t | 57637 | 0 | 2147483647 + table3_groupe | 1300055 | t | 57638 | 0 | 2147483647 + schema_colocation.table4_groupe | 1300056 | t | 57637 | -2147483648 | -1 + schema_colocation.table4_groupe | 1300056 | t | 57638 | -2147483648 | -1 + schema_colocation.table4_groupe | 1300057 | t | 57637 | 0 | 2147483647 + schema_colocation.table4_groupe | 1300057 | t | 57638 | 0 | 2147483647 + table1_group_none_1 | 1300058 | t | 57637 | -2147483648 | -1 + table1_group_none_1 | 1300058 | t | 57638 | -2147483648 | -1 + table1_group_none_1 | 1300059 | t | 57637 | 0 | 2147483647 + table1_group_none_1 | 1300059 | t | 57638 | 0 | 2147483647 + table2_group_none_1 | 1300060 | t | 57637 | -2147483648 | -1 + table2_group_none_1 | 1300060 | t | 57638 | -2147483648 | -1 + table2_group_none_1 | 1300061 | t | 57637 | 0 | 2147483647 + table2_group_none_1 | 1300061 | t | 57638 | 0 | 2147483647 + table1_group_none_2 | 1300062 | t | 57637 | -2147483648 | -1 + table1_group_none_2 | 1300062 | t | 57638 | -2147483648 | -1 + table1_group_none_2 | 1300063 | t | 57637 | 0 | 2147483647 + table1_group_none_2 | 1300063 | t | 57638 | 0 | 2147483647 + table4_groupe | 1300064 | t | 57637 | -2147483648 | -1 + table4_groupe | 1300064 | t | 57638 | -2147483648 | -1 + table4_groupe | 1300065 | t | 57637 | 0 | 2147483647 + table4_groupe | 1300065 | t | 57638 | 0 | 2147483647 + table1_group_none_3 | 1300066 | t | 57637 | -2147483648 | -715827884 + table1_group_none_3 | 1300066 | t | 57638 | -2147483648 | -715827884 + table1_group_none_3 | 1300067 | t | 57637 | -715827883 | 715827881 + table1_group_none_3 | 1300067 | t | 57638 | -715827883 | 715827881 + table1_group_none_3 | 1300068 | t | 57637 | 715827882 | 2147483647 + table1_group_none_3 | 1300068 | t | 57638 | 715827882 | 2147483647 + table1_group_default | 1300069 | t | 57637 | -2147483648 | -715827884 + table1_group_default | 1300069 | t | 57638 | -2147483648 | -715827884 + table1_group_default | 1300070 | t | 57637 | -715827883 | 715827881 + table1_group_default | 1300070 | t | 57638 | -715827883 | 715827881 + table1_group_default | 1300071 | t | 57637 | 715827882 | 2147483647 + table1_group_default | 1300071 | t | 57638 | 715827882 | 2147483647 table1_groupf | 1300080 | t | 57637 | | table1_groupf | 1300080 | t | 57638 | | table2_groupf | 1300081 | t | 57637 | | table2_groupf | 1300081 | t | 57638 | | -(108 rows) +(92 rows) -- reset colocation ids to test update_distributed_table_colocation ALTER SEQUENCE pg_catalog.pg_dist_colocationid_seq RESTART 1; @@ -862,7 +834,7 @@ ERROR: cannot colocate tables table1_groupd and table1_groupb DETAIL: Shard counts don't match for table1_groupd and table1_groupb. SELECT update_distributed_table_colocation('table1_groupB', colocate_with => 'table1_groupE'); ERROR: cannot colocate tables table1_groupe and table1_groupb -DETAIL: Shard 1300058 of table1_groupe and shard xxxxx of table1_groupb have different number of shard placements. +DETAIL: Shard 1300050 of table1_groupe and shard xxxxx of table1_groupb have different number of shard placements. SELECT update_distributed_table_colocation('table1_groupB', colocate_with => 'table1_groupF'); ERROR: relation table1_groupf should be a hash distributed table SELECT update_distributed_table_colocation('table1_groupB', colocate_with => 'table1_groupD'); @@ -1355,4 +1327,3 @@ DROP TABLE range_table; DROP TABLE none; DROP TABLE ref; DROP TABLE local_table; -DROP FOREIGN TABLE table3_groupD CASCADE; diff --git a/src/test/regress/expected/multi_create_shards.out b/src/test/regress/expected/multi_create_shards.out index 122aa2081..b7e0778e6 100644 --- a/src/test/regress/expected/multi_create_shards.out +++ b/src/test/regress/expected/multi_create_shards.out @@ -149,44 +149,6 @@ SELECT COUNT(*) FROM pg_class WHERE relname LIKE 'throwaway%' AND relkind = 'r'; 0 (1 row) --- test foreign table creation -CREATE FOREIGN TABLE foreign_table_to_distribute -( - name text, - id bigint -) -SERVER fake_fdw_server; -SET citus.shard_count TO 16; -SET citus.shard_replication_factor TO 1; -SELECT create_distributed_table('foreign_table_to_distribute', 'id', 'hash'); - create_distributed_table ---------------------------------------------------------------------- - -(1 row) - -SELECT shardstorage, shardminvalue, shardmaxvalue FROM pg_dist_shard - WHERE logicalrelid = 'foreign_table_to_distribute'::regclass - ORDER BY (shardminvalue::integer) ASC; - shardstorage | shardminvalue | shardmaxvalue ---------------------------------------------------------------------- - f | -2147483648 | -1879048193 - f | -1879048192 | -1610612737 - f | -1610612736 | -1342177281 - f | -1342177280 | -1073741825 - f | -1073741824 | -805306369 - f | -805306368 | -536870913 - f | -536870912 | -268435457 - f | -268435456 | -1 - f | 0 | 268435455 - f | 268435456 | 536870911 - f | 536870912 | 805306367 - f | 805306368 | 1073741823 - f | 1073741824 | 1342177279 - f | 1342177280 | 1610612735 - f | 1610612736 | 1879048191 - f | 1879048192 | 2147483647 -(16 rows) - -- test shard creation using weird shard count CREATE TABLE weird_shard_count ( @@ -216,11 +178,3 @@ SELECT shardmaxvalue::integer - shardminvalue::integer AS shard_size 613566759 (7 rows) --- cleanup foreign table, related shards and shard placements -DELETE FROM pg_dist_shard_placement - WHERE shardid IN (SELECT shardid FROM pg_dist_shard - WHERE logicalrelid = 'foreign_table_to_distribute'::regclass); -DELETE FROM pg_dist_shard - WHERE logicalrelid = 'foreign_table_to_distribute'::regclass; -DELETE FROM pg_dist_partition - WHERE logicalrelid = 'foreign_table_to_distribute'::regclass; diff --git a/src/test/regress/expected/multi_generate_ddl_commands.out b/src/test/regress/expected/multi_generate_ddl_commands.out index fa7fdc211..6aae20f9b 100644 --- a/src/test/regress/expected/multi_generate_ddl_commands.out +++ b/src/test/regress/expected/multi_generate_ddl_commands.out @@ -166,62 +166,12 @@ SELECT master_get_table_ddl_events('fiddly_table'); ALTER TABLE public.fiddly_table OWNER TO postgres (3 rows) --- test foreign tables using fake FDW -CREATE FOREIGN TABLE foreign_table ( - id bigint not null, - full_name text not null default '' -) SERVER fake_fdw_server OPTIONS (encoding 'utf-8', compression 'true'); -SELECT create_distributed_table('foreign_table', 'id'); - create_distributed_table ---------------------------------------------------------------------- - -(1 row) - -ALTER FOREIGN TABLE foreign_table rename to renamed_foreign_table_with_long_name_12345678901234567890123456789012345678901234567890; -NOTICE: identifier "renamed_foreign_table_with_long_name_12345678901234567890123456789012345678901234567890" will be truncated to "renamed_foreign_table_with_long_name_12345678901234567890123456" -ALTER FOREIGN TABLE renamed_foreign_table_with_long_name_12345678901234567890123456789012345678901234567890 rename full_name to rename_name; -NOTICE: identifier "renamed_foreign_table_with_long_name_12345678901234567890123456789012345678901234567890" will be truncated to "renamed_foreign_table_with_long_name_12345678901234567890123456" -ALTER FOREIGN TABLE renamed_foreign_table_with_long_name_12345678901234567890123456789012345678901234567890 alter rename_name type char(8); -NOTICE: identifier "renamed_foreign_table_with_long_name_12345678901234567890123456789012345678901234567890" will be truncated to "renamed_foreign_table_with_long_name_12345678901234567890123456" -\c - - :public_worker_1_host :worker_1_port -select table_name, column_name, data_type -from information_schema.columns -where table_schema='public' and table_name like 'renamed_foreign_table_%' and column_name <> 'id' -order by table_name; - table_name | column_name | data_type ---------------------------------------------------------------------- - renamed_foreign_table_with_long_name_1234567890_6a8dd6f8_610008 | rename_name | character - renamed_foreign_table_with_long_name_1234567890_6a8dd6f8_610009 | rename_name | character - renamed_foreign_table_with_long_name_1234567890_6a8dd6f8_610010 | rename_name | character - renamed_foreign_table_with_long_name_1234567890_6a8dd6f8_610011 | rename_name | character -(4 rows) - -\c - - :master_host :master_port -SELECT master_get_table_ddl_events('renamed_foreign_table_with_long_name_12345678901234567890123456789012345678901234567890'); - master_get_table_ddl_events ---------------------------------------------------------------------- - CREATE FOREIGN TABLE public.renamed_foreign_table_with_long_name_12345678901234567890123456 (id bigint NOT NULL, rename_name character(8) DEFAULT ''::text NOT NULL) SERVER fake_fdw_server OPTIONS (encoding 'utf-8', compression 'true') - ALTER TABLE public.renamed_foreign_table_with_long_name_12345678901234567890123456 OWNER TO postgres -(2 rows) - -- propagating views is not supported CREATE VIEW local_view AS SELECT * FROM simple_table; SELECT master_get_table_ddl_events('local_view'); ERROR: local_view is not a regular, foreign or partitioned table -- clean up DROP VIEW IF EXISTS local_view; -DROP FOREIGN TABLE IF EXISTS renamed_foreign_table_with_long_name_12345678901234567890123456789012345678901234567890; -NOTICE: identifier "renamed_foreign_table_with_long_name_12345678901234567890123456789012345678901234567890" will be truncated to "renamed_foreign_table_with_long_name_12345678901234567890123456" -\c - - :public_worker_1_host :worker_1_port -select table_name, column_name, data_type -from information_schema.columns -where table_schema='public' and table_name like 'renamed_foreign_table_%' and column_name <> 'id' -order by table_name; - table_name | column_name | data_type ---------------------------------------------------------------------- -(0 rows) - -\c - - :master_host :master_port DROP TABLE IF EXISTS simple_table, not_null_table, column_constraint_table, table_constraint_table, default_value_table, pkey_table, unique_table, clustered_table, fiddly_table; diff --git a/src/test/regress/expected/multi_repair_shards.out b/src/test/regress/expected/multi_repair_shards.out index ba4d2b1b0..c7fc05080 100644 --- a/src/test/regress/expected/multi_repair_shards.out +++ b/src/test/regress/expected/multi_repair_shards.out @@ -92,30 +92,3 @@ SELECT * FROM customer_engagements; 1 | 03-01-2015 | third event (3 rows) --- now do the same test over again with a foreign table -CREATE FOREIGN TABLE remote_engagements ( - id integer, - created_at date, - event_data text -) SERVER fake_fdw_server; --- distribute the table --- create a single shard on the first worker -SET citus.shard_count TO 1; -SET citus.shard_replication_factor TO 2; -SELECT create_distributed_table('remote_engagements', 'id', 'hash'); - create_distributed_table ---------------------------------------------------------------------- - -(1 row) - --- get the newshardid -SELECT shardid as remotenewshardid FROM pg_dist_shard WHERE logicalrelid = 'remote_engagements'::regclass -\gset --- now, update the second placement as unhealthy -UPDATE pg_dist_placement SET shardstate = 3 WHERE shardid = :remotenewshardid AND groupid = :worker_2_group; --- oops! we don't support repairing shards backed by foreign tables -SELECT master_copy_shard_placement(:remotenewshardid, 'localhost', :worker_1_port, 'localhost', :worker_2_port); -ERROR: cannot repair shard -DETAIL: Table remote_engagements is a foreign table. Repairing shards backed by foreign tables is not supported. --- clean-up -DROP FOREIGN TABLE remote_engagements CASCADE; diff --git a/src/test/regress/expected/undistribute_table.out b/src/test/regress/expected/undistribute_table.out index c35c12be2..5b646da32 100644 --- a/src/test/regress/expected/undistribute_table.out +++ b/src/test/regress/expected/undistribute_table.out @@ -128,45 +128,6 @@ SELECT undistribute_table('referencing_table'); ERROR: cannot complete operation because table referencing_table has a foreign key HINT: Use cascade option to undistribute all the relations involved in a foreign key relationship with undistribute_table.referencing_table by executing SELECT undistribute_table($$undistribute_table.referencing_table$$, cascade_via_foreign_keys=>true) DROP TABLE referenced_table, referencing_table; --- test distributed foreign tables --- we expect errors --- and we need metadata sync off for foreign tables -SELECT stop_metadata_sync_to_node(nodename, nodeport) FROM pg_dist_node WHERE isactive = 't' and noderole = 'primary'; -NOTICE: dropping metadata on the node (localhost,57638) -NOTICE: dropping metadata on the node (localhost,57637) - stop_metadata_sync_to_node ---------------------------------------------------------------------- - - -(2 rows) - -CREATE FOREIGN TABLE foreign_table ( - id bigint not null, - full_name text not null default '' -) SERVER fake_fdw_server OPTIONS (encoding 'utf-8', compression 'true'); -SELECT create_distributed_table('foreign_table', 'id'); - create_distributed_table ---------------------------------------------------------------------- - -(1 row) - -SELECT undistribute_table('foreign_table'); -NOTICE: creating a new table for undistribute_table.foreign_table -NOTICE: dropping the old undistribute_table.foreign_table -NOTICE: renaming the new table to undistribute_table.foreign_table - undistribute_table ---------------------------------------------------------------------- - -(1 row) - -DROP FOREIGN TABLE foreign_table; -SELECT start_metadata_sync_to_node(nodename, nodeport) FROM pg_dist_node WHERE isactive = 't' and noderole = 'primary'; - start_metadata_sync_to_node ---------------------------------------------------------------------- - - -(2 rows) - -- test partitioned tables CREATE TABLE partitioned_table (id INT, a INT) PARTITION BY RANGE (id); CREATE TABLE partitioned_table_1_5 PARTITION OF partitioned_table FOR VALUES FROM (1) TO (5); diff --git a/src/test/regress/sql/foreign_tables_mx.sql b/src/test/regress/sql/foreign_tables_mx.sql index 080f7cff1..a7c3138df 100644 --- a/src/test/regress/sql/foreign_tables_mx.sql +++ b/src/test/regress/sql/foreign_tables_mx.sql @@ -135,10 +135,10 @@ ALTER USER MAPPING FOR postgres SERVER foreign_server OPTIONS (SET user 'postgre -- test undistributing DELETE FROM foreign_table; SELECT undistribute_table('foreign_table'); + +-- both should error out SELECT create_distributed_table('foreign_table','data'); -SELECT undistribute_table('foreign_table'); SELECT create_reference_table('foreign_table'); -SELECT undistribute_table('foreign_table'); INSERT INTO foreign_table_test VALUES (1, 'testt'); SELECT * FROM foreign_table ORDER BY a; diff --git a/src/test/regress/sql/mixed_relkind_tests.sql b/src/test/regress/sql/mixed_relkind_tests.sql index e4c7e6624..8e258b7d1 100644 --- a/src/test/regress/sql/mixed_relkind_tests.sql +++ b/src/test/regress/sql/mixed_relkind_tests.sql @@ -47,9 +47,6 @@ SELECT create_distributed_table('partitioned_distributed_table', 'a'); CREATE VIEW view_on_part_dist AS SELECT * FROM partitioned_distributed_table; CREATE MATERIALIZED VIEW mat_view_on_part_dist AS SELECT * FROM partitioned_distributed_table; -CREATE FOREIGN TABLE foreign_distributed_table (a int, b int) SERVER fake_fdw_server; -SELECT create_distributed_table('foreign_distributed_table', 'a'); - -- and insert some data INSERT INTO postgres_local_table SELECT * FROM generate_series(0, 5); INSERT INTO partitioned_postgres_local_table SELECT * FROM generate_series(0, 5); @@ -65,7 +62,6 @@ SELECT * FROM partitioned_distributed_table UNION SELECT 1, * FROM postgres_loca SELECT * FROM partitioned_distributed_table UNION SELECT * FROM unlogged_distributed_table ORDER BY 1,2; SELECT *, 1 FROM postgres_local_table UNION SELECT * FROM unlogged_distributed_table ORDER BY 1,2; SELECT * FROM unlogged_distributed_table UNION SELECT 1,1 ORDER BY 1,2; -SELECT * from foreign_distributed_table UNION SELECT 1,1 ORDER BY 1,2; SELECT 1 UNION SELECT * FROM citus_local_table ORDER BY 1; SELECT * FROM view_on_part_dist UNION SELECT 1,1 ORDER BY 1,2; @@ -117,11 +113,6 @@ SELECT COUNT(*) FROM (SELECT *, random() FROM partitioned_distributed_table) AS bar WHERE foo.a = bar.b; -SELECT COUNT(*) FROM - (SELECT *, random() FROM unlogged_distributed_table) AS foo, - (SELECT *, random() FROM foreign_distributed_table) AS bar -WHERE foo.a = bar.b; - UPDATE partitioned_distributed_table SET b = foo.a FROM citus_local_table AS foo; UPDATE partitioned_distributed_table SET b = foo.a FROM postgres_local_table AS foo; UPDATE partitioned_distributed_table SET a = foo.a FROM postgres_local_table AS foo WHERE foo.a = partitioned_distributed_table.a; @@ -161,9 +152,6 @@ WITH cte_1 AS MATERIALIZED (SELECT * FROM partitioned_distributed_table) WITH cte_1 AS MATERIALIZED (SELECT * FROM partitioned_distributed_table) SELECT COUNT(*) FROM cte_1 JOIN partitioned_distributed_table USING (a); -WITH cte_1 AS MATERIALIZED (SELECT * FROM foreign_distributed_table) - SELECT COUNT(*) FROM cte_1 JOIN foreign_distributed_table USING (a); - WITH cte_1 AS MATERIALIZED (SELECT * FROM partitioned_distributed_table) SELECT COUNT(*) FROM cte_1 JOIN partitioned_distributed_table USING (b); @@ -245,22 +233,12 @@ EXPLAIN (COSTS OFF) SELECT a, COUNT(*) OVER (PARTITION BY a) FROM partitioned_distributed_table ORDER BY 1,2; $Q$); -SELECT public.coordinator_plan($Q$ -EXPLAIN (COSTS OFF) -SELECT a, COUNT(*) OVER (PARTITION BY a) FROM foreign_distributed_table ORDER BY 1,2; -$Q$); - -- pull to coordinator WINDOW SELECT public.coordinator_plan($Q$ EXPLAIN (COSTS OFF) SELECT a, COUNT(*) OVER (PARTITION BY a+1) FROM partitioned_distributed_table ORDER BY 1,2; $Q$); -SELECT public.coordinator_plan($Q$ -EXPLAIN (COSTS OFF) -SELECT a, COUNT(*) OVER (PARTITION BY a+1) FROM foreign_distributed_table ORDER BY 1,2; -$Q$); - -- FOR UPDATE SELECT * FROM partitioned_distributed_table WHERE a = 1 ORDER BY 1,2 FOR UPDATE; SELECT * FROM unlogged_distributed_table WHERE a = 1 ORDER BY 1,2 FOR UPDATE; @@ -276,10 +254,5 @@ BEGIN; SELECT * FROM partitioned_distributed_table; COMMIT; -BEGIN; - ALTER TABLE foreign_distributed_table DROP COLUMN b CASCADE; - SELECT * FROM foreign_distributed_table; -COMMIT; - -- cleanup at exit DROP SCHEMA mixed_relkind_tests CASCADE; diff --git a/src/test/regress/sql/multi_colocation_utils.sql b/src/test/regress/sql/multi_colocation_utils.sql index 63b122405..af33835d2 100644 --- a/src/test/regress/sql/multi_colocation_utils.sql +++ b/src/test/regress/sql/multi_colocation_utils.sql @@ -202,10 +202,6 @@ SELECT create_distributed_table('table_append', 'id', 'append'); CREATE TABLE table_range ( id int ); SELECT create_distributed_table('table_range', 'id', 'range'); --- test foreign table creation -CREATE FOREIGN TABLE table3_groupD ( id int ) SERVER fake_fdw_server; -SELECT create_distributed_table('table3_groupD', 'id'); - -- check metadata SELECT * FROM pg_dist_colocation WHERE colocationid >= 1 AND colocationid < 1000 @@ -290,8 +286,8 @@ CREATE TABLE table_bigint ( id bigint ); SELECT create_distributed_table('table_bigint', 'id', colocate_with => 'table1_groupE'); -- check worker table schemas \c - - - :worker_1_port -SELECT "Column", "Type", "Modifiers" FROM table_desc WHERE relid='public.table3_groupE_1300062'::regclass; -SELECT "Column", "Type", "Modifiers" FROM table_desc WHERE relid='schema_colocation.table4_groupE_1300064'::regclass; +SELECT "Column", "Type", "Modifiers" FROM table_desc WHERE relid='public.table3_groupE_1300054'::regclass; +SELECT "Column", "Type", "Modifiers" FROM table_desc WHERE relid='schema_colocation.table4_groupE_1300056'::regclass; \c - - - :master_port SET citus.next_shard_id TO 1300080; @@ -553,4 +549,3 @@ DROP TABLE range_table; DROP TABLE none; DROP TABLE ref; DROP TABLE local_table; -DROP FOREIGN TABLE table3_groupD CASCADE; diff --git a/src/test/regress/sql/multi_create_shards.sql b/src/test/regress/sql/multi_create_shards.sql index 6037fc17b..d7d787e89 100644 --- a/src/test/regress/sql/multi_create_shards.sql +++ b/src/test/regress/sql/multi_create_shards.sql @@ -106,22 +106,6 @@ SELECT sort_names('sumedh', 'jason', 'ozgun'); SELECT COUNT(*) FROM pg_class WHERE relname LIKE 'throwaway%' AND relkind = 'r'; --- test foreign table creation -CREATE FOREIGN TABLE foreign_table_to_distribute -( - name text, - id bigint -) -SERVER fake_fdw_server; - -SET citus.shard_count TO 16; -SET citus.shard_replication_factor TO 1; -SELECT create_distributed_table('foreign_table_to_distribute', 'id', 'hash'); - -SELECT shardstorage, shardminvalue, shardmaxvalue FROM pg_dist_shard - WHERE logicalrelid = 'foreign_table_to_distribute'::regclass - ORDER BY (shardminvalue::integer) ASC; - -- test shard creation using weird shard count CREATE TABLE weird_shard_count ( @@ -137,14 +121,3 @@ SELECT shardmaxvalue::integer - shardminvalue::integer AS shard_size FROM pg_dist_shard WHERE logicalrelid = 'weird_shard_count'::regclass ORDER BY shardminvalue::integer ASC; - --- cleanup foreign table, related shards and shard placements -DELETE FROM pg_dist_shard_placement - WHERE shardid IN (SELECT shardid FROM pg_dist_shard - WHERE logicalrelid = 'foreign_table_to_distribute'::regclass); - -DELETE FROM pg_dist_shard - WHERE logicalrelid = 'foreign_table_to_distribute'::regclass; - -DELETE FROM pg_dist_partition - WHERE logicalrelid = 'foreign_table_to_distribute'::regclass; diff --git a/src/test/regress/sql/multi_generate_ddl_commands.sql b/src/test/regress/sql/multi_generate_ddl_commands.sql index f46225c73..4237d62b0 100644 --- a/src/test/regress/sql/multi_generate_ddl_commands.sql +++ b/src/test/regress/sql/multi_generate_ddl_commands.sql @@ -116,25 +116,6 @@ ALTER TABLE fiddly_table SELECT master_get_table_ddl_events('fiddly_table'); --- test foreign tables using fake FDW -CREATE FOREIGN TABLE foreign_table ( - id bigint not null, - full_name text not null default '' -) SERVER fake_fdw_server OPTIONS (encoding 'utf-8', compression 'true'); - -SELECT create_distributed_table('foreign_table', 'id'); -ALTER FOREIGN TABLE foreign_table rename to renamed_foreign_table_with_long_name_12345678901234567890123456789012345678901234567890; -ALTER FOREIGN TABLE renamed_foreign_table_with_long_name_12345678901234567890123456789012345678901234567890 rename full_name to rename_name; -ALTER FOREIGN TABLE renamed_foreign_table_with_long_name_12345678901234567890123456789012345678901234567890 alter rename_name type char(8); -\c - - :public_worker_1_host :worker_1_port -select table_name, column_name, data_type -from information_schema.columns -where table_schema='public' and table_name like 'renamed_foreign_table_%' and column_name <> 'id' -order by table_name; -\c - - :master_host :master_port - -SELECT master_get_table_ddl_events('renamed_foreign_table_with_long_name_12345678901234567890123456789012345678901234567890'); - -- propagating views is not supported CREATE VIEW local_view AS SELECT * FROM simple_table; @@ -142,13 +123,6 @@ SELECT master_get_table_ddl_events('local_view'); -- clean up DROP VIEW IF EXISTS local_view; -DROP FOREIGN TABLE IF EXISTS renamed_foreign_table_with_long_name_12345678901234567890123456789012345678901234567890; -\c - - :public_worker_1_host :worker_1_port -select table_name, column_name, data_type -from information_schema.columns -where table_schema='public' and table_name like 'renamed_foreign_table_%' and column_name <> 'id' -order by table_name; -\c - - :master_host :master_port DROP TABLE IF EXISTS simple_table, not_null_table, column_constraint_table, table_constraint_table, default_value_table, pkey_table, unique_table, clustered_table, fiddly_table; diff --git a/src/test/regress/sql/multi_repair_shards.sql b/src/test/regress/sql/multi_repair_shards.sql index 98085b3c9..f910585cb 100644 --- a/src/test/regress/sql/multi_repair_shards.sql +++ b/src/test/regress/sql/multi_repair_shards.sql @@ -80,29 +80,3 @@ UPDATE pg_dist_placement SET shardstate = 3 WHERE shardid = :newshardid AND grou -- get the data from the second placement SELECT * FROM customer_engagements; - --- now do the same test over again with a foreign table -CREATE FOREIGN TABLE remote_engagements ( - id integer, - created_at date, - event_data text -) SERVER fake_fdw_server; - --- distribute the table --- create a single shard on the first worker -SET citus.shard_count TO 1; -SET citus.shard_replication_factor TO 2; -SELECT create_distributed_table('remote_engagements', 'id', 'hash'); - --- get the newshardid -SELECT shardid as remotenewshardid FROM pg_dist_shard WHERE logicalrelid = 'remote_engagements'::regclass -\gset - --- now, update the second placement as unhealthy -UPDATE pg_dist_placement SET shardstate = 3 WHERE shardid = :remotenewshardid AND groupid = :worker_2_group; - --- oops! we don't support repairing shards backed by foreign tables -SELECT master_copy_shard_placement(:remotenewshardid, 'localhost', :worker_1_port, 'localhost', :worker_2_port); - --- clean-up -DROP FOREIGN TABLE remote_engagements CASCADE; diff --git a/src/test/regress/sql/undistribute_table.sql b/src/test/regress/sql/undistribute_table.sql index 71e9a72d9..01ec4629d 100644 --- a/src/test/regress/sql/undistribute_table.sql +++ b/src/test/regress/sql/undistribute_table.sql @@ -52,20 +52,6 @@ SELECT undistribute_table('referencing_table'); DROP TABLE referenced_table, referencing_table; --- test distributed foreign tables --- we expect errors --- and we need metadata sync off for foreign tables -SELECT stop_metadata_sync_to_node(nodename, nodeport) FROM pg_dist_node WHERE isactive = 't' and noderole = 'primary'; -CREATE FOREIGN TABLE foreign_table ( - id bigint not null, - full_name text not null default '' -) SERVER fake_fdw_server OPTIONS (encoding 'utf-8', compression 'true'); -SELECT create_distributed_table('foreign_table', 'id'); -SELECT undistribute_table('foreign_table'); - -DROP FOREIGN TABLE foreign_table; -SELECT start_metadata_sync_to_node(nodename, nodeport) FROM pg_dist_node WHERE isactive = 't' and noderole = 'primary'; - -- test partitioned tables CREATE TABLE partitioned_table (id INT, a INT) PARTITION BY RANGE (id); CREATE TABLE partitioned_table_1_5 PARTITION OF partitioned_table FOR VALUES FROM (1) TO (5); From 8d1b1886201480f6d3aad4f63436b6248777bd8a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C3=96nder=20Kalac=C4=B1?= Date: Fri, 7 Jan 2022 17:24:31 +0100 Subject: [PATCH 3/6] Enable MX for the remaining failure tests (#5606) --- .../failure_create_distributed_table_non_empty.out | 12 ++++++++++++ src/test/regress/expected/failure_create_table.out | 8 +++++++- .../regress/expected/failure_single_select.out | 14 ++++++++++++++ src/test/regress/failure_schedule | 3 --- .../failure_create_distributed_table_non_empty.sql | 6 ++++++ src/test/regress/sql/failure_create_table.sql | 1 + src/test/regress/sql/failure_single_select.sql | 7 +++++++ 7 files changed, 47 insertions(+), 4 deletions(-) diff --git a/src/test/regress/expected/failure_create_distributed_table_non_empty.out b/src/test/regress/expected/failure_create_distributed_table_non_empty.out index 3de2cd874..13f6cdffa 100644 --- a/src/test/regress/expected/failure_create_distributed_table_non_empty.out +++ b/src/test/regress/expected/failure_create_distributed_table_non_empty.out @@ -872,6 +872,18 @@ SELECT count(*) FROM pg_dist_shard WHERE logicalrelid='create_distributed_table_ 4 (1 row) +SELECT citus.mitmproxy('conn.allow()'); + mitmproxy +--------------------------------------------------------------------- + +(1 row) + +SELECT recover_prepared_transactions(); + recover_prepared_transactions +--------------------------------------------------------------------- + 2 +(1 row) + DROP TABLE test_table; CREATE TABLE test_table(id int, value_1 int); INSERT INTO test_table VALUES (1,1),(2,2),(3,3),(4,4); diff --git a/src/test/regress/expected/failure_create_table.out b/src/test/regress/expected/failure_create_table.out index e37060735..14b3daa66 100644 --- a/src/test/regress/expected/failure_create_table.out +++ b/src/test/regress/expected/failure_create_table.out @@ -289,6 +289,12 @@ SELECT citus.mitmproxy('conn.allow()'); (1 row) +SELECT recover_prepared_transactions(); + recover_prepared_transactions +--------------------------------------------------------------------- + 1 +(1 row) + SELECT count(*) FROM pg_dist_shard; count --------------------------------------------------------------------- @@ -421,7 +427,7 @@ COMMIT; SELECT recover_prepared_transactions(); recover_prepared_transactions --------------------------------------------------------------------- - 4 + 0 (1 row) SELECT citus.mitmproxy('conn.allow()'); diff --git a/src/test/regress/expected/failure_single_select.out b/src/test/regress/expected/failure_single_select.out index c7ee9d9d1..4cfa1252b 100644 --- a/src/test/regress/expected/failure_single_select.out +++ b/src/test/regress/expected/failure_single_select.out @@ -12,6 +12,8 @@ SELECT citus.clear_network_traffic(); SET citus.shard_count = 2; SET citus.shard_replication_factor = 2; +-- this test is designed such that no modification lock is acquired +SET citus.allow_modifications_from_workers_to_replicated_tables TO false; CREATE TABLE select_test (key int, value text); SELECT create_distributed_table('select_test', 'key'); create_distributed_table @@ -60,6 +62,12 @@ ERROR: connection to the remote node localhost:xxxxx failed with the following This probably means the server terminated abnormally before or while processing the request. COMMIT; +SELECT citus.mitmproxy('conn.allow()'); + mitmproxy +--------------------------------------------------------------------- + +(1 row) + TRUNCATE select_test; -- now the same tests with query cancellation -- put data in shard for which mitm node is first placement @@ -96,6 +104,12 @@ WHERE shardid IN ( 1 (1 row) +SELECT citus.mitmproxy('conn.allow()'); + mitmproxy +--------------------------------------------------------------------- + +(1 row) + TRUNCATE select_test; -- cancel the second query -- error after second SELECT; txn should fail diff --git a/src/test/regress/failure_schedule b/src/test/regress/failure_schedule index 550544a7f..18a45fd26 100644 --- a/src/test/regress/failure_schedule +++ b/src/test/regress/failure_schedule @@ -16,12 +16,9 @@ test: failure_add_disable_node test: failure_copy_to_reference test: failure_copy_on_hash test: failure_create_reference_table -test: check_mx -test: turn_mx_off test: failure_create_distributed_table_non_empty test: failure_create_table test: failure_single_select -test: turn_mx_on test: failure_multi_shard_update_delete test: failure_cte_subquery diff --git a/src/test/regress/sql/failure_create_distributed_table_non_empty.sql b/src/test/regress/sql/failure_create_distributed_table_non_empty.sql index 21350aee5..29dc7a2d7 100644 --- a/src/test/regress/sql/failure_create_distributed_table_non_empty.sql +++ b/src/test/regress/sql/failure_create_distributed_table_non_empty.sql @@ -304,6 +304,10 @@ BEGIN; SELECT create_distributed_table('test_table', 'id'); COMMIT; SELECT count(*) FROM pg_dist_shard WHERE logicalrelid='create_distributed_table_non_empty_failure.test_table'::regclass; + +SELECT citus.mitmproxy('conn.allow()'); +SELECT recover_prepared_transactions(); + DROP TABLE test_table; CREATE TABLE test_table(id int, value_1 int); INSERT INTO test_table VALUES (1,1),(2,2),(3,3),(4,4); @@ -314,7 +318,9 @@ SELECT citus.mitmproxy('conn.onQuery(query="^COMMIT").cancel(' || pg_backend_pi BEGIN; SELECT create_distributed_table('test_table', 'id'); COMMIT; + SELECT citus.mitmproxy('conn.allow()'); + SELECT count(*) FROM pg_dist_shard WHERE logicalrelid='create_distributed_table_non_empty_failure.test_table'::regclass; DROP TABLE test_table; CREATE TABLE test_table(id int, value_1 int); diff --git a/src/test/regress/sql/failure_create_table.sql b/src/test/regress/sql/failure_create_table.sql index c10c16c30..a4035b431 100644 --- a/src/test/regress/sql/failure_create_table.sql +++ b/src/test/regress/sql/failure_create_table.sql @@ -99,6 +99,7 @@ SELECT citus.mitmproxy('conn.onCommandComplete(command="PREPARE TRANSACTION").ki SELECT create_distributed_table('test_table','id',colocate_with=>'temp_table'); SELECT citus.mitmproxy('conn.allow()'); +SELECT recover_prepared_transactions(); SELECT count(*) FROM pg_dist_shard; SELECT run_command_on_workers($$SELECT count(*) FROM information_schema.tables WHERE table_schema = 'failure_create_table' and table_name LIKE 'test_table%' ORDER BY 1$$); diff --git a/src/test/regress/sql/failure_single_select.sql b/src/test/regress/sql/failure_single_select.sql index f39677e1e..9a4a82d12 100644 --- a/src/test/regress/sql/failure_single_select.sql +++ b/src/test/regress/sql/failure_single_select.sql @@ -4,6 +4,9 @@ SELECT citus.clear_network_traffic(); SET citus.shard_count = 2; SET citus.shard_replication_factor = 2; +-- this test is designed such that no modification lock is acquired +SET citus.allow_modifications_from_workers_to_replicated_tables TO false; + CREATE TABLE select_test (key int, value text); SELECT create_distributed_table('select_test', 'key'); @@ -23,6 +26,8 @@ INSERT INTO select_test VALUES (3, 'more data'); SELECT * FROM select_test WHERE key = 3; COMMIT; +SELECT citus.mitmproxy('conn.allow()'); + TRUNCATE select_test; -- now the same tests with query cancellation @@ -47,6 +52,8 @@ SELECT DISTINCT shardstate FROM pg_dist_shard_placement WHERE shardid IN ( SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'select_test'::regclass ); + +SELECT citus.mitmproxy('conn.allow()'); TRUNCATE select_test; -- cancel the second query From ee3b50b026be27604b96573f80a1c5dc280d60d0 Mon Sep 17 00:00:00 2001 From: Marco Slot Date: Thu, 6 Jan 2022 13:58:33 +0100 Subject: [PATCH 4/6] Disallow remote execution from queries on shards --- src/backend/distributed/commands/call.c | 36 ++++--- src/backend/distributed/commands/multi_copy.c | 8 ++ .../distributed/commands/utility_hook.c | 10 ++ .../distributed/executor/adaptive_executor.c | 6 ++ .../distributed/executor/multi_executor.c | 45 +++++++++ .../planner/function_call_delegation.c | 34 +++++-- .../test/run_from_same_connection.c | 6 ++ .../transaction/transaction_management.c | 2 + .../distributed/commands/utility_hook.h | 2 + .../distributed/function_call_delegation.h | 8 ++ src/include/distributed/multi_executor.h | 2 + .../expected/citus_local_table_triggers.out | 93 ++++++++++++++++--- .../regress/expected/foreign_tables_mx.out | 14 +-- src/test/regress/expected/multi_mx_call.out | 84 ++++++++++++++--- src/test/regress/expected/multi_mx_call_0.out | 84 ++++++++++++++--- .../multi_mx_function_call_delegation.out | 60 ++++++++---- .../multi_mx_function_call_delegation_0.out | 60 ++++++++---- .../sql/citus_local_table_triggers.sql | 51 +++++++++- src/test/regress/sql/foreign_tables_mx.sql | 10 +- src/test/regress/sql/multi_mx_call.sql | 13 +++ .../sql/multi_mx_function_call_delegation.sql | 20 +++- 21 files changed, 531 insertions(+), 117 deletions(-) diff --git a/src/backend/distributed/commands/call.c b/src/backend/distributed/commands/call.c index dda4eb3de..af319f0ce 100644 --- a/src/backend/distributed/commands/call.c +++ b/src/backend/distributed/commands/call.c @@ -24,6 +24,7 @@ #include "distributed/commands/utility_hook.h" #include "distributed/connection_management.h" #include "distributed/deparse_shard_query.h" +#include "distributed/function_call_delegation.h" #include "distributed/metadata_utility.h" #include "distributed/metadata_cache.h" #include "distributed/multi_executor.h" @@ -46,9 +47,10 @@ #include "utils/lsyscache.h" #include "utils/syscache.h" -static bool CallFuncExprRemotely(CallStmt *callStmt, - DistObjectCacheEntry *procedure, - FuncExpr *funcExpr, DestReceiver *dest); + +/* global variable tracking whether we are in a delegated procedure call */ +bool InDelegatedProcedureCall = false; + /* * CallDistributedProcedureRemotely calls a stored procedure on the worker if possible. @@ -61,28 +63,21 @@ CallDistributedProcedureRemotely(CallStmt *callStmt, DestReceiver *dest) DistObjectCacheEntry *procedure = LookupDistObjectCacheEntry(ProcedureRelationId, functionId, 0); - - /* - * If procedure is not distributed or already delegated from another - * node, do not call the procedure remotely. - */ - if (procedure == NULL || !procedure->isDistributed || - IsCitusInitiatedRemoteBackend()) + if (procedure == NULL || !procedure->isDistributed) { return false; } - return CallFuncExprRemotely(callStmt, procedure, funcExpr, dest); -} + if (IsCitusInitiatedRemoteBackend()) + { + /* + * We are in a citus-initiated backend handling a CALL to a distributed + * procedure. That means that this is the delegated call. + */ + InDelegatedProcedureCall = true; + return false; + } - -/* - * CallFuncExprRemotely calls a procedure of function on the worker if possible. - */ -static bool -CallFuncExprRemotely(CallStmt *callStmt, DistObjectCacheEntry *procedure, - FuncExpr *funcExpr, DestReceiver *dest) -{ if (IsMultiStatementTransaction()) { ereport(DEBUG1, (errmsg("cannot push down CALL in multi-statement transaction"))); @@ -102,6 +97,7 @@ CallFuncExprRemotely(CallStmt *callStmt, DistObjectCacheEntry *procedure, "be constant expressions"))); return false; } + CitusTableCacheEntry *distTable = GetCitusTableCacheEntry(colocatedRelationId); Var *partitionColumn = distTable->partitionColumn; bool colocatedWithReferenceTable = false; diff --git a/src/backend/distributed/commands/multi_copy.c b/src/backend/distributed/commands/multi_copy.c index d683a2792..d2d7d9b23 100644 --- a/src/backend/distributed/commands/multi_copy.c +++ b/src/backend/distributed/commands/multi_copy.c @@ -3340,6 +3340,7 @@ InitializeCopyShardState(CopyShardState *shardState, { ListCell *placementCell = NULL; int failedPlacementCount = 0; + bool hasRemoteCopy = false; MemoryContext localContext = AllocSetContextCreateExtended(CurrentMemoryContext, @@ -3383,6 +3384,8 @@ InitializeCopyShardState(CopyShardState *shardState, continue; } + hasRemoteCopy = true; + MultiConnection *connection = CopyGetPlacementConnection(connectionStateHash, placement, colocatedIntermediateResult); @@ -3427,6 +3430,11 @@ InitializeCopyShardState(CopyShardState *shardState, ereport(ERROR, (errmsg("could not connect to any active placements"))); } + if (hasRemoteCopy) + { + EnsureRemoteTaskExecutionAllowed(); + } + /* * We just error out and code execution should never reach to this * point. This is the case for all tables. diff --git a/src/backend/distributed/commands/utility_hook.c b/src/backend/distributed/commands/utility_hook.c index 672a7ce81..3e2a2d24b 100644 --- a/src/backend/distributed/commands/utility_hook.c +++ b/src/backend/distributed/commands/utility_hook.c @@ -227,10 +227,20 @@ multi_ProcessUtility(PlannedStmt *pstmt, params, queryEnv, dest, completionTag); StoredProcedureLevel -= 1; + + if (InDelegatedProcedureCall && StoredProcedureLevel == 0) + { + InDelegatedProcedureCall = false; + } } PG_CATCH(); { StoredProcedureLevel -= 1; + + if (InDelegatedProcedureCall && StoredProcedureLevel == 0) + { + InDelegatedProcedureCall = false; + } PG_RE_THROW(); } PG_END_TRY(); diff --git a/src/backend/distributed/executor/adaptive_executor.c b/src/backend/distributed/executor/adaptive_executor.c index 0e9d96fd5..a90e49ced 100644 --- a/src/backend/distributed/executor/adaptive_executor.c +++ b/src/backend/distributed/executor/adaptive_executor.c @@ -1290,6 +1290,12 @@ StartDistributedExecution(DistributedExecution *execution) */ RecordParallelRelationAccessForTaskList(execution->remoteAndLocalTaskList); } + + /* make sure we are not doing remote execution from within a task */ + if (execution->remoteTaskList != NIL) + { + EnsureRemoteTaskExecutionAllowed(); + } } diff --git a/src/backend/distributed/executor/multi_executor.c b/src/backend/distributed/executor/multi_executor.c index 5a535043d..afde1328c 100644 --- a/src/backend/distributed/executor/multi_executor.c +++ b/src/backend/distributed/executor/multi_executor.c @@ -21,9 +21,11 @@ #include "distributed/citus_custom_scan.h" #include "distributed/commands/multi_copy.h" #include "distributed/commands/utility_hook.h" +#include "distributed/function_call_delegation.h" #include "distributed/insert_select_executor.h" #include "distributed/insert_select_planner.h" #include "distributed/listutils.h" +#include "distributed/local_executor.h" #include "distributed/coordinator_protocol.h" #include "distributed/multi_executor.h" #include "distributed/combine_query_planner.h" @@ -719,3 +721,46 @@ ExecutorBoundParams(void) Assert(ExecutorLevel > 0); return executorBoundParams; } + + +/* + * EnsureRemoteTaskExecutionAllowed ensures that we do not perform remote + * execution from within a task. That could happen when the user calls + * a function in a query that gets pushed down to the worker, and the + * function performs a query on a distributed table. + */ +void +EnsureRemoteTaskExecutionAllowed(void) +{ + if (!InTaskExecution()) + { + /* we are not within a task, distributed execution is allowed */ + return; + } + + ereport(ERROR, (errmsg("cannot execute a distributed query from a query on a " + "shard"))); +} + + +/* + * InTaskExecution determines whether we are currently in a task execution. + */ +bool +InTaskExecution(void) +{ + if (LocalExecutorLevel > 0) + { + /* in a local task */ + return true; + } + + /* + * Normally, any query execution within a citus-initiated backend + * is considered a task execution, but an exception is when we + * are in a delegated function/procedure call. + */ + return IsCitusInitiatedRemoteBackend() && + !InDelegatedFunctionCall && + !InDelegatedProcedureCall; +} diff --git a/src/backend/distributed/planner/function_call_delegation.c b/src/backend/distributed/planner/function_call_delegation.c index 6618e49e1..39cf334c6 100644 --- a/src/backend/distributed/planner/function_call_delegation.c +++ b/src/backend/distributed/planner/function_call_delegation.c @@ -58,6 +58,10 @@ struct ParamWalkerContext static bool contain_param_walker(Node *node, void *context); +/* global variable keeping track of whether we are in a delegated function call */ +bool InDelegatedFunctionCall = false; + + /* * contain_param_walker scans node for Param nodes. * Ignore the return value, instead check context afterwards. @@ -112,15 +116,6 @@ TryToDelegateFunctionCall(DistributedPlanningContext *planContext) } int32 localGroupId = GetLocalGroupId(); - if (localGroupId != COORDINATOR_GROUP_ID && IsCitusInitiatedRemoteBackend()) - { - /* - * Do not delegate from workers if it is initiated by Citus already. - * It means that this function has already been delegated to this node. - */ - return NULL; - } - if (localGroupId == GROUP_ID_UPGRADING) { /* do not delegate while upgrading */ @@ -218,6 +213,27 @@ TryToDelegateFunctionCall(DistributedPlanningContext *planContext) ereport(DEBUG4, (errmsg("function is distributed"))); } + if (IsCitusInitiatedRemoteBackend()) + { + /* + * We are planning a call to a distributed function within a Citus backend, + * that means that this is the delegated call. + */ + InDelegatedFunctionCall = true; + return NULL; + } + + if (localGroupId != COORDINATOR_GROUP_ID) + { + /* + * We are calling a distributed function on a worker node. We currently + * only delegate from the coordinator. + * + * TODO: remove this restriction. + */ + return NULL; + } + /* * Cannot delegate functions for INSERT ... SELECT func(), since they require * coordinated transactions. diff --git a/src/backend/distributed/test/run_from_same_connection.c b/src/backend/distributed/test/run_from_same_connection.c index e0b7d806c..3b5f804b4 100644 --- a/src/backend/distributed/test/run_from_same_connection.c +++ b/src/backend/distributed/test/run_from_same_connection.c @@ -113,6 +113,12 @@ start_session_level_connection_to_node(PG_FUNCTION_ARGS) elog(ERROR, "failed to connect to %s:%d", nodeNameString, (int) nodePort); } + /* pretend we are a regular client to avoid citus-initiated backend checks */ + const char *setAppName = + "SET application_name TO run_commands_on_session_level_connection_to_node"; + + ExecuteCriticalRemoteCommand(singleConnection, setAppName); + PG_RETURN_VOID(); } diff --git a/src/backend/distributed/transaction/transaction_management.c b/src/backend/distributed/transaction/transaction_management.c index ee9912fe9..4c4958015 100644 --- a/src/backend/distributed/transaction/transaction_management.c +++ b/src/backend/distributed/transaction/transaction_management.c @@ -23,6 +23,7 @@ #include "distributed/citus_safe_lib.h" #include "distributed/connection_management.h" #include "distributed/distributed_planner.h" +#include "distributed/function_call_delegation.h" #include "distributed/hash_helpers.h" #include "distributed/intermediate_results.h" #include "distributed/listutils.h" @@ -550,6 +551,7 @@ ResetGlobalVariables() ShouldCoordinatedTransactionUse2PC = false; TransactionModifiedNodeMetadata = false; MetadataSyncOnCommit = false; + InDelegatedFunctionCall = false; ResetWorkerErrorIndication(); } diff --git a/src/include/distributed/commands/utility_hook.h b/src/include/distributed/commands/utility_hook.h index 9ead0df8b..1ee18a206 100644 --- a/src/include/distributed/commands/utility_hook.h +++ b/src/include/distributed/commands/utility_hook.h @@ -18,6 +18,7 @@ #include "tcop/utility.h" #include "distributed/coordinator_protocol.h" +#include "distributed/function_call_delegation.h" #include "distributed/version_compat.h" #include "distributed/worker_transaction.h" @@ -37,6 +38,7 @@ extern bool EnableAlterRolePropagation; extern bool EnableAlterRoleSetPropagation; extern bool EnableAlterDatabaseOwner; extern int UtilityHookLevel; +extern bool InDelegatedProcedureCall; /* diff --git a/src/include/distributed/function_call_delegation.h b/src/include/distributed/function_call_delegation.h index 865ac0ce1..7d3c61aea 100644 --- a/src/include/distributed/function_call_delegation.h +++ b/src/include/distributed/function_call_delegation.h @@ -15,6 +15,14 @@ #include "distributed/multi_physical_planner.h" +/* + * These flags keep track of whether the process is currently in a delegated + * function or procedure call. + */ +extern bool InDelegatedFunctionCall; +extern bool InDelegatedProcedureCall; + + PlannedStmt * TryToDelegateFunctionCall(DistributedPlanningContext *planContext); diff --git a/src/include/distributed/multi_executor.h b/src/include/distributed/multi_executor.h index 143f5a1c7..3648dbc1b 100644 --- a/src/include/distributed/multi_executor.h +++ b/src/include/distributed/multi_executor.h @@ -149,6 +149,8 @@ extern void ExtractParametersFromParamList(ParamListInfo paramListInfo, const char ***parameterValues, bool useOriginalCustomTypeOids); extern ParamListInfo ExecutorBoundParams(void); +extern void EnsureRemoteTaskExecutionAllowed(void); +extern bool InTaskExecution(void); #endif /* MULTI_EXECUTOR_H */ diff --git a/src/test/regress/expected/citus_local_table_triggers.out b/src/test/regress/expected/citus_local_table_triggers.out index ac6906282..1a269c649 100644 --- a/src/test/regress/expected/citus_local_table_triggers.out +++ b/src/test/regress/expected/citus_local_table_triggers.out @@ -392,6 +392,13 @@ BEGIN RETURN NEW; END; $insert_100$ LANGUAGE plpgsql; +CREATE TABLE local_table (value int); +CREATE FUNCTION insert_100_local() RETURNS trigger AS $insert_100$ +BEGIN + INSERT INTO local_table VALUES (100); + RETURN NEW; +END; +$insert_100$ LANGUAGE plpgsql; BEGIN; CREATE TRIGGER insert_100_trigger AFTER TRUNCATE ON another_citus_local_table @@ -416,6 +423,7 @@ NOTICE: executing the command locally: SELECT value FROM citus_local_table_trig (2 rows) ROLLBACK; +-- cannot perform remote execution from a trigger on a Citus local table BEGIN; -- update should actually update something to test ON UPDATE CASCADE logic INSERT INTO another_citus_local_table VALUES (600); @@ -436,11 +444,70 @@ NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1 FOR EACH STATEMENT EXECUTE FUNCTION insert_100();') UPDATE another_citus_local_table SET value=value-1;; NOTICE: executing the command locally: UPDATE citus_local_table_triggers.another_citus_local_table_1507009 another_citus_local_table SET value = (value OPERATOR(pg_catalog.-) 1) -NOTICE: executing the command locally: INSERT INTO citus_local_table_triggers.reference_table_1507010 (value) VALUES (100) -NOTICE: executing the command locally: INSERT INTO citus_local_table_triggers.reference_table_1507010 (value) VALUES (100) +ERROR: cannot execute a distributed query from a query on a shard +ROLLBACK; +-- can perform regular execution from a trigger on a Citus local table +BEGIN; + -- update should actually update something to test ON UPDATE CASCADE logic + INSERT INTO another_citus_local_table VALUES (600); +NOTICE: executing the command locally: INSERT INTO citus_local_table_triggers.another_citus_local_table_1507009 (value) VALUES (600) + INSERT INTO citus_local_table VALUES (600); +NOTICE: executing the command locally: INSERT INTO citus_local_table_triggers.citus_local_table_1507001 (value) VALUES (600) + CREATE TRIGGER insert_100_trigger + AFTER UPDATE ON another_citus_local_table + FOR EACH STATEMENT EXECUTE FUNCTION insert_100_local(); +NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1507009, 'citus_local_table_triggers', 'CREATE TRIGGER insert_100_trigger + AFTER UPDATE ON another_citus_local_table + FOR EACH STATEMENT EXECUTE FUNCTION insert_100_local();') + CREATE TRIGGER insert_100_trigger + AFTER UPDATE ON citus_local_table + FOR EACH STATEMENT EXECUTE FUNCTION insert_100_local(); +NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1507001, 'citus_local_table_triggers', 'CREATE TRIGGER insert_100_trigger + AFTER UPDATE ON citus_local_table + FOR EACH STATEMENT EXECUTE FUNCTION insert_100_local();') + UPDATE another_citus_local_table SET value=value-1;; +NOTICE: executing the command locally: UPDATE citus_local_table_triggers.another_citus_local_table_1507009 another_citus_local_table SET value = (value OPERATOR(pg_catalog.-) 1) -- we should see two rows with "100" - SELECT * FROM reference_table; -NOTICE: executing the command locally: SELECT value FROM citus_local_table_triggers.reference_table_1507010 reference_table + SELECT * FROM local_table; + value +--------------------------------------------------------------------- + 100 + 100 +(2 rows) + +ROLLBACK; +-- can perform local execution from a trigger on a Citus local table +BEGIN; + SELECT citus_add_local_table_to_metadata('local_table'); + citus_add_local_table_to_metadata +--------------------------------------------------------------------- + +(1 row) + + -- update should actually update something to test ON UPDATE CASCADE logic + INSERT INTO another_citus_local_table VALUES (600); +NOTICE: executing the command locally: INSERT INTO citus_local_table_triggers.another_citus_local_table_1507009 (value) VALUES (600) + INSERT INTO citus_local_table VALUES (600); +NOTICE: executing the command locally: INSERT INTO citus_local_table_triggers.citus_local_table_1507001 (value) VALUES (600) + CREATE TRIGGER insert_100_trigger + AFTER UPDATE ON another_citus_local_table + FOR EACH STATEMENT EXECUTE FUNCTION insert_100_local(); +NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1507009, 'citus_local_table_triggers', 'CREATE TRIGGER insert_100_trigger + AFTER UPDATE ON another_citus_local_table + FOR EACH STATEMENT EXECUTE FUNCTION insert_100_local();') + CREATE TRIGGER insert_100_trigger + AFTER UPDATE ON citus_local_table + FOR EACH STATEMENT EXECUTE FUNCTION insert_100_local(); +NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1507001, 'citus_local_table_triggers', 'CREATE TRIGGER insert_100_trigger + AFTER UPDATE ON citus_local_table + FOR EACH STATEMENT EXECUTE FUNCTION insert_100_local();') + UPDATE another_citus_local_table SET value=value-1;; +NOTICE: executing the command locally: UPDATE citus_local_table_triggers.another_citus_local_table_1507009 another_citus_local_table SET value = (value OPERATOR(pg_catalog.-) 1) +NOTICE: executing the command locally: INSERT INTO citus_local_table_triggers.local_table_1507011 (value) VALUES (100) +NOTICE: executing the command locally: INSERT INTO citus_local_table_triggers.local_table_1507011 (value) VALUES (100) + -- we should see two rows with "100" + SELECT * FROM local_table; +NOTICE: executing the command locally: SELECT value FROM citus_local_table_triggers.local_table_1507011 local_table value --------------------------------------------------------------------- 100 @@ -456,11 +523,11 @@ CREATE TABLE par_another_citus_local_table_1 PARTITION OF par_another_citus_loca ALTER TABLE par_another_citus_local_table ADD CONSTRAINT fkey_self FOREIGN KEY(val) REFERENCES par_another_citus_local_table(val); ALTER TABLE par_citus_local_table ADD CONSTRAINT fkey_c_to_c FOREIGN KEY(val) REFERENCES par_another_citus_local_table(val) ON UPDATE CASCADE; SELECT citus_add_local_table_to_metadata('par_another_citus_local_table', cascade_via_foreign_keys=>true); -NOTICE: executing the command locally: SELECT worker_apply_inter_shard_ddl_command (1507011, 'citus_local_table_triggers', 1507012, 'citus_local_table_triggers', 'ALTER TABLE citus_local_table_triggers.par_another_citus_local_table ATTACH PARTITION citus_local_table_triggers.par_another_citus_local_table_1 FOR VALUES FROM (1) TO (10000);') -NOTICE: executing the command locally: SELECT pg_catalog.citus_run_local_command($$SELECT worker_fix_partition_shard_index_names('citus_local_table_triggers.par_another_citus_local_table_val_key_1507011'::regclass, 'citus_local_table_triggers.par_another_citus_local_table_1_1507012', 'par_another_citus_local_table_1_val_key_1507012')$$) -NOTICE: executing the command locally: SELECT worker_apply_inter_shard_ddl_command (1507013, 'citus_local_table_triggers', 1507014, 'citus_local_table_triggers', 'ALTER TABLE citus_local_table_triggers.par_citus_local_table ATTACH PARTITION citus_local_table_triggers.par_citus_local_table_1 FOR VALUES FROM (1) TO (10000);') -NOTICE: executing the command locally: SELECT worker_apply_inter_shard_ddl_command (1507011, 'citus_local_table_triggers', 1507011, 'citus_local_table_triggers', 'ALTER TABLE citus_local_table_triggers.par_another_citus_local_table ADD CONSTRAINT fkey_self FOREIGN KEY (val) REFERENCES citus_local_table_triggers.par_another_citus_local_table(val)') -NOTICE: executing the command locally: SELECT worker_apply_inter_shard_ddl_command (1507013, 'citus_local_table_triggers', 1507011, 'citus_local_table_triggers', 'ALTER TABLE citus_local_table_triggers.par_citus_local_table ADD CONSTRAINT fkey_c_to_c FOREIGN KEY (val) REFERENCES citus_local_table_triggers.par_another_citus_local_table(val) ON UPDATE CASCADE') +NOTICE: executing the command locally: SELECT worker_apply_inter_shard_ddl_command (1507012, 'citus_local_table_triggers', 1507013, 'citus_local_table_triggers', 'ALTER TABLE citus_local_table_triggers.par_another_citus_local_table ATTACH PARTITION citus_local_table_triggers.par_another_citus_local_table_1 FOR VALUES FROM (1) TO (10000);') +NOTICE: executing the command locally: SELECT pg_catalog.citus_run_local_command($$SELECT worker_fix_partition_shard_index_names('citus_local_table_triggers.par_another_citus_local_table_val_key_1507012'::regclass, 'citus_local_table_triggers.par_another_citus_local_table_1_1507013', 'par_another_citus_local_table_1_val_key_1507013')$$) +NOTICE: executing the command locally: SELECT worker_apply_inter_shard_ddl_command (1507014, 'citus_local_table_triggers', 1507015, 'citus_local_table_triggers', 'ALTER TABLE citus_local_table_triggers.par_citus_local_table ATTACH PARTITION citus_local_table_triggers.par_citus_local_table_1 FOR VALUES FROM (1) TO (10000);') +NOTICE: executing the command locally: SELECT worker_apply_inter_shard_ddl_command (1507012, 'citus_local_table_triggers', 1507012, 'citus_local_table_triggers', 'ALTER TABLE citus_local_table_triggers.par_another_citus_local_table ADD CONSTRAINT fkey_self FOREIGN KEY (val) REFERENCES citus_local_table_triggers.par_another_citus_local_table(val)') +NOTICE: executing the command locally: SELECT worker_apply_inter_shard_ddl_command (1507014, 'citus_local_table_triggers', 1507012, 'citus_local_table_triggers', 'ALTER TABLE citus_local_table_triggers.par_citus_local_table ADD CONSTRAINT fkey_c_to_c FOREIGN KEY (val) REFERENCES citus_local_table_triggers.par_another_citus_local_table(val) ON UPDATE CASCADE') citus_add_local_table_to_metadata --------------------------------------------------------------------- @@ -489,7 +556,7 @@ BEGIN; TRUNCATE par_another_citus_local_table CASCADE; NOTICE: truncate cascades to table "par_citus_local_table" NOTICE: truncate cascades to table "par_citus_local_table_1" -NOTICE: executing the command locally: INSERT INTO citus_local_table_triggers.par_reference_table_1507015 (val) VALUES (100) +NOTICE: executing the command locally: INSERT INTO citus_local_table_triggers.par_reference_table_1507016 (val) VALUES (100) NOTICE: executing the command locally: TRUNCATE TABLE citus_local_table_triggers.par_another_citus_local_table_xxxxx CASCADE NOTICE: truncate cascades to table "par_citus_local_table_xxxxx" NOTICE: truncate cascades to table "par_citus_local_table_1_xxxxx" @@ -497,12 +564,12 @@ NOTICE: executing the command locally: TRUNCATE TABLE citus_local_table_trigger NOTICE: truncate cascades to table "par_citus_local_table_xxxxx" NOTICE: truncate cascades to table "par_citus_local_table_1_xxxxx" NOTICE: truncate cascades to table "par_another_citus_local_table_xxxxx" -NOTICE: executing the command locally: INSERT INTO citus_local_table_triggers.par_reference_table_1507015 (val) VALUES (100) +NOTICE: executing the command locally: INSERT INTO citus_local_table_triggers.par_reference_table_1507016 (val) VALUES (100) NOTICE: executing the command locally: TRUNCATE TABLE citus_local_table_triggers.par_citus_local_table_xxxxx CASCADE NOTICE: executing the command locally: TRUNCATE TABLE citus_local_table_triggers.par_citus_local_table_1_xxxxx CASCADE -- we should see two rows with "100" SELECT * FROM par_reference_table; -NOTICE: executing the command locally: SELECT val FROM citus_local_table_triggers.par_reference_table_1507015 par_reference_table +NOTICE: executing the command locally: SELECT val FROM citus_local_table_triggers.par_reference_table_1507016 par_reference_table val --------------------------------------------------------------------- 100 @@ -512,4 +579,4 @@ NOTICE: executing the command locally: SELECT val FROM citus_local_table_trigge ROLLBACK; -- cleanup at exit DROP SCHEMA citus_local_table_triggers, "interesting!schema" CASCADE; -NOTICE: drop cascades to 20 other objects +NOTICE: drop cascades to 22 other objects diff --git a/src/test/regress/expected/foreign_tables_mx.out b/src/test/regress/expected/foreign_tables_mx.out index a3de72259..17b1c99b8 100644 --- a/src/test/regress/expected/foreign_tables_mx.out +++ b/src/test/regress/expected/foreign_tables_mx.out @@ -66,16 +66,10 @@ ALTER FOREIGN TABLE public.foreign_table_newname ADD CONSTRAINT check_c_2 check( ALTER FOREIGN TABLE public.foreign_table_newname VALIDATE CONSTRAINT check_c_2; ALTER FOREIGN TABLE public.foreign_table_newname DROP constraint IF EXISTS check_c_2; -- trigger test -CREATE TABLE distributed_table(value int); -SELECT create_distributed_table('distributed_table', 'value'); - create_distributed_table ---------------------------------------------------------------------- - -(1 row) - +CREATE TABLE table42(value int); CREATE FUNCTION insert_42() RETURNS trigger AS $insert_42$ BEGIN - INSERT INTO distributed_table VALUES (42); + INSERT INTO table42 VALUES (42); RETURN NEW; END; $insert_42$ LANGUAGE plpgsql; @@ -85,7 +79,7 @@ FOR EACH ROW EXECUTE FUNCTION insert_42(); -- do the same pattern from the workers as well INSERT INTO public.foreign_table_newname VALUES (99, 'test_2'); delete from public.foreign_table_newname where id_test = 99; -select * from distributed_table ORDER BY value; +select * from table42 ORDER BY value; value --------------------------------------------------------------------- 42 @@ -96,7 +90,7 @@ alter foreign table public.foreign_table_newname disable trigger insert_42_trigg INSERT INTO public.foreign_table_newname VALUES (99, 'test_2'); delete from public.foreign_table_newname where id_test = 99; -- should not insert again as trigger disabled -select * from distributed_table ORDER BY value; +select * from table42 ORDER BY value; value --------------------------------------------------------------------- 42 diff --git a/src/test/regress/expected/multi_mx_call.out b/src/test/regress/expected/multi_mx_call.out index 7f077c77a..64b033d41 100644 --- a/src/test/regress/expected/multi_mx_call.out +++ b/src/test/regress/expected/multi_mx_call.out @@ -131,6 +131,12 @@ BEGIN y := x; x := (select case groupid when 0 then 'F' else 'S' end from pg_dist_local_group); END;$$; +CREATE PROCEDURE mx_call_proc_copy(x int) +LANGUAGE plpgsql AS $$ +BEGIN + INSERT INTO multi_mx_call.mx_call_dist_table_1 + SELECT s,s FROM generate_series(100, 110) s; +END;$$; -- Test that undistributed procedures have no issue executing call multi_mx_call.mx_call_proc(2, 0); y @@ -144,6 +150,7 @@ call multi_mx_call.mx_call_proc_custom_types('S', 'A'); F | S (1 row) +call multi_mx_call.mx_call_proc_copy(2); -- Same for unqualified names call mx_call_proc(2, 0); y @@ -176,6 +183,12 @@ select create_distributed_function('mx_call_proc_custom_types(mx_call_enum,mx_ca (1 row) +select create_distributed_function('mx_call_proc_copy(int)'); + create_distributed_function +--------------------------------------------------------------------- + +(1 row) + -- We still don't route them to the workers, because they aren't -- colocated with any distributed tables. SET client_min_messages TO DEBUG1; @@ -206,6 +219,12 @@ DEBUG: stored procedure does not have co-located tables F | S (1 row) +call multi_mx_call.mx_call_proc_copy(2); +DEBUG: stored procedure does not have co-located tables +DEBUG: Collecting INSERT ... SELECT results on coordinator +CONTEXT: SQL statement "INSERT INTO multi_mx_call.mx_call_dist_table_1 + SELECT s,s FROM generate_series(100, 110) s" +PL/pgSQL function mx_call_proc_copy(integer) line XX at SQL statement -- Mark them as colocated with a table. Now we should route them to workers. select colocate_proc_with_table('mx_call_proc', 'mx_call_dist_table_1'::regclass, 1); colocate_proc_with_table @@ -225,6 +244,12 @@ select colocate_proc_with_table('mx_call_proc_custom_types', 'mx_call_dist_table (1 row) +select colocate_proc_with_table('mx_call_proc_copy', 'mx_call_dist_table_1'::regclass, 0); + colocate_proc_with_table +--------------------------------------------------------------------- + +(1 row) + call multi_mx_call.mx_call_proc(2, 0); DEBUG: pushing down the procedure y @@ -253,6 +278,8 @@ DEBUG: pushing down the procedure S | S (1 row) +call mx_call_proc_copy(2); +DEBUG: pushing down the procedure -- Test implicit cast of int to bigint call mx_call_proc_bigint(4, 2); DEBUG: pushing down the procedure @@ -398,18 +425,51 @@ DETAIL: A distributed function is created. To make sure subsequent commands see CALL multi_mx_call.mx_call_proc_tx(20); DEBUG: pushing down the procedure SELECT id, val FROM mx_call_dist_table_1 ORDER BY id, val; - id | val + id | val --------------------------------------------------------------------- - 3 | 1 - 3 | 5 - 4 | 5 - 6 | 5 - 9 | 2 - 10 | -2 - 11 | 3 - 20 | -2 - 21 | 3 -(9 rows) + 3 | 1 + 3 | 5 + 4 | 5 + 6 | 5 + 9 | 2 + 10 | -2 + 11 | 3 + 20 | -2 + 21 | 3 + 100 | 98 + 100 | 98 + 100 | 98 + 101 | 99 + 101 | 99 + 101 | 99 + 102 | 100 + 102 | 100 + 102 | 100 + 103 | 101 + 103 | 101 + 103 | 101 + 104 | 102 + 104 | 102 + 104 | 102 + 105 | 103 + 105 | 103 + 105 | 103 + 106 | 104 + 106 | 104 + 106 | 104 + 107 | 105 + 107 | 105 + 107 | 105 + 108 | 106 + 108 | 106 + 108 | 106 + 109 | 107 + 109 | 107 + 109 | 107 + 110 | 108 + 110 | 108 + 110 | 108 +(42 rows) -- Show that function delegation works from worker nodes as well \c - - - :worker_1_port @@ -539,4 +599,4 @@ PL/pgSQL function mx_call_proc(integer,integer) line XX at assignment reset client_min_messages; \set VERBOSITY terse drop schema multi_mx_call cascade; -NOTICE: drop cascades to 13 other objects +NOTICE: drop cascades to 14 other objects diff --git a/src/test/regress/expected/multi_mx_call_0.out b/src/test/regress/expected/multi_mx_call_0.out index 77667f75b..496e735c9 100644 --- a/src/test/regress/expected/multi_mx_call_0.out +++ b/src/test/regress/expected/multi_mx_call_0.out @@ -131,6 +131,12 @@ BEGIN y := x; x := (select case groupid when 0 then 'F' else 'S' end from pg_dist_local_group); END;$$; +CREATE PROCEDURE mx_call_proc_copy(x int) +LANGUAGE plpgsql AS $$ +BEGIN + INSERT INTO multi_mx_call.mx_call_dist_table_1 + SELECT s,s FROM generate_series(100, 110) s; +END;$$; -- Test that undistributed procedures have no issue executing call multi_mx_call.mx_call_proc(2, 0); y @@ -144,6 +150,7 @@ call multi_mx_call.mx_call_proc_custom_types('S', 'A'); F | S (1 row) +call multi_mx_call.mx_call_proc_copy(2); -- Same for unqualified names call mx_call_proc(2, 0); y @@ -176,6 +183,12 @@ select create_distributed_function('mx_call_proc_custom_types(mx_call_enum,mx_ca (1 row) +select create_distributed_function('mx_call_proc_copy(int)'); + create_distributed_function +--------------------------------------------------------------------- + +(1 row) + -- We still don't route them to the workers, because they aren't -- colocated with any distributed tables. SET client_min_messages TO DEBUG1; @@ -206,6 +219,12 @@ DEBUG: stored procedure does not have co-located tables F | S (1 row) +call multi_mx_call.mx_call_proc_copy(2); +DEBUG: stored procedure does not have co-located tables +DEBUG: Collecting INSERT ... SELECT results on coordinator +CONTEXT: SQL statement "INSERT INTO multi_mx_call.mx_call_dist_table_1 + SELECT s,s FROM generate_series(100, 110) s" +PL/pgSQL function mx_call_proc_copy(integer) line XX at SQL statement -- Mark them as colocated with a table. Now we should route them to workers. select colocate_proc_with_table('mx_call_proc', 'mx_call_dist_table_1'::regclass, 1); colocate_proc_with_table @@ -225,6 +244,12 @@ select colocate_proc_with_table('mx_call_proc_custom_types', 'mx_call_dist_table (1 row) +select colocate_proc_with_table('mx_call_proc_copy', 'mx_call_dist_table_1'::regclass, 0); + colocate_proc_with_table +--------------------------------------------------------------------- + +(1 row) + call multi_mx_call.mx_call_proc(2, 0); DEBUG: pushing down the procedure y @@ -253,6 +278,8 @@ DEBUG: pushing down the procedure S | S (1 row) +call mx_call_proc_copy(2); +DEBUG: pushing down the procedure -- Test implicit cast of int to bigint call mx_call_proc_bigint(4, 2); DEBUG: pushing down the procedure @@ -398,18 +425,51 @@ DETAIL: A distributed function is created. To make sure subsequent commands see CALL multi_mx_call.mx_call_proc_tx(20); DEBUG: pushing down the procedure SELECT id, val FROM mx_call_dist_table_1 ORDER BY id, val; - id | val + id | val --------------------------------------------------------------------- - 3 | 1 - 3 | 5 - 4 | 5 - 6 | 5 - 9 | 2 - 10 | -2 - 11 | 3 - 20 | -2 - 21 | 3 -(9 rows) + 3 | 1 + 3 | 5 + 4 | 5 + 6 | 5 + 9 | 2 + 10 | -2 + 11 | 3 + 20 | -2 + 21 | 3 + 100 | 98 + 100 | 98 + 100 | 98 + 101 | 99 + 101 | 99 + 101 | 99 + 102 | 100 + 102 | 100 + 102 | 100 + 103 | 101 + 103 | 101 + 103 | 101 + 104 | 102 + 104 | 102 + 104 | 102 + 105 | 103 + 105 | 103 + 105 | 103 + 106 | 104 + 106 | 104 + 106 | 104 + 107 | 105 + 107 | 105 + 107 | 105 + 108 | 106 + 108 | 106 + 108 | 106 + 109 | 107 + 109 | 107 + 109 | 107 + 110 | 108 + 110 | 108 + 110 | 108 +(42 rows) -- Show that function delegation works from worker nodes as well \c - - - :worker_1_port @@ -539,4 +599,4 @@ PL/pgSQL function mx_call_proc(integer,integer) line XX at assignment reset client_min_messages; \set VERBOSITY terse drop schema multi_mx_call cascade; -NOTICE: drop cascades to 13 other objects +NOTICE: drop cascades to 14 other objects diff --git a/src/test/regress/expected/multi_mx_function_call_delegation.out b/src/test/regress/expected/multi_mx_function_call_delegation.out index 4fd13cee8..e77e0c3b5 100644 --- a/src/test/regress/expected/multi_mx_function_call_delegation.out +++ b/src/test/regress/expected/multi_mx_function_call_delegation.out @@ -83,6 +83,16 @@ BEGIN y := x; x := (select case groupid when 0 then 'F' else 'S' end from pg_dist_local_group); END;$$; +-- function which internally uses COPY protocol without remote execution +CREATE FUNCTION mx_call_func_copy(x int) +RETURNS bool +LANGUAGE plpgsql AS $$ +BEGIN + INSERT INTO multi_mx_function_call_delegation.mx_call_dist_table_1 + SELECT s,s FROM generate_series(100, 110) s; + + RETURN true; +END;$$; -- Test that undistributed functions have no issue executing select multi_mx_function_call_delegation.mx_call_func(2, 0); mx_call_func @@ -96,6 +106,9 @@ select multi_mx_function_call_delegation.mx_call_func_custom_types('S', 'A'); (F,S) (1 row) +select multi_mx_function_call_delegation.mx_call_copy(2); +ERROR: function multi_mx_function_call_delegation.mx_call_copy(integer) does not exist +HINT: No function matches the given name and argument types. You might need to add explicit type casts. select squares(4); squares --------------------------------------------------------------------- @@ -131,6 +144,12 @@ select create_distributed_function('mx_call_func_custom_types(mx_call_enum,mx_ca (1 row) +select create_distributed_function('mx_call_func_copy(int)'); + create_distributed_function +--------------------------------------------------------------------- + +(1 row) + select create_distributed_function('squares(int)'); create_distributed_function --------------------------------------------------------------------- @@ -604,20 +623,6 @@ PL/pgSQL function mx_call_func(integer,integer) line XX at assignment 29 (1 row) -select mx_call_func(2, 0) from mx_call_dist_table_1; - mx_call_func ---------------------------------------------------------------------- - 28 - 28 - 28 - 28 - 28 - 28 - 28 - 28 - 28 -(9 rows) - select mx_call_func(2, 0) where mx_call_func(0, 2) = 0; DEBUG: generating subplan XXX_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_function_call_delegation.mx_call_dist_table_1 t1 JOIN multi_mx_function_call_delegation.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id))) CONTEXT: PL/pgSQL assignment "y := y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)" @@ -647,6 +652,24 @@ PL/pgSQL function mx_call_func(integer,integer) line XX at assignment 29 | 27 (1 row) +-- we do not delegate the call, but do push down the query +-- that result in remote execution from workers +select mx_call_func(id, 0) from mx_call_dist_table_1; +ERROR: cannot execute a distributed query from a query on a shard +CONTEXT: PL/pgSQL assignment "y := y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)" +PL/pgSQL function multi_mx_function_call_delegation.mx_call_func(integer,integer) line XX at assignment +while executing command on localhost:xxxxx +select mx_call_func(2, 0) from mx_call_dist_table_1 where id = 3; +ERROR: cannot execute a distributed query from a query on a shard +CONTEXT: PL/pgSQL assignment "y := y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)" +PL/pgSQL function multi_mx_function_call_delegation.mx_call_func(integer,integer) line XX at assignment +while executing command on localhost:xxxxx +select mx_call_func_copy(2) from mx_call_dist_table_1 where id = 3; +ERROR: cannot execute a distributed query from a query on a shard +CONTEXT: SQL statement "INSERT INTO multi_mx_function_call_delegation.mx_call_dist_table_1 + SELECT s,s FROM generate_series(100, 110) s" +PL/pgSQL function multi_mx_function_call_delegation.mx_call_func_copy(integer) line XX at SQL statement +while executing command on localhost:xxxxx DO $$ BEGIN perform mx_call_func_tbl(40); END; $$; DEBUG: not pushing down function calls in a multi-statement transaction CONTEXT: SQL statement "SELECT mx_call_func_tbl(40)" @@ -725,7 +748,12 @@ HINT: Connect to the coordinator and run it again. -- show that functions can be delegated from worker nodes SET client_min_messages TO DEBUG1; SELECT mx_call_func(2, 0); -DEBUG: pushing down the function call +DEBUG: generating subplan XXX_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_function_call_delegation.mx_call_dist_table_1 t1 JOIN multi_mx_function_call_delegation.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id))) +CONTEXT: PL/pgSQL assignment "y := y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)" +PL/pgSQL function mx_call_func(integer,integer) line XX at assignment +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT ((2 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint))))::integer +CONTEXT: PL/pgSQL assignment "y := y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)" +PL/pgSQL function mx_call_func(integer,integer) line XX at assignment mx_call_func --------------------------------------------------------------------- 28 @@ -736,4 +764,4 @@ SET search_path TO multi_mx_function_call_delegation, public; RESET client_min_messages; \set VERBOSITY terse DROP SCHEMA multi_mx_function_call_delegation CASCADE; -NOTICE: drop cascades to 14 other objects +NOTICE: drop cascades to 15 other objects diff --git a/src/test/regress/expected/multi_mx_function_call_delegation_0.out b/src/test/regress/expected/multi_mx_function_call_delegation_0.out index 70672b455..657183bc2 100644 --- a/src/test/regress/expected/multi_mx_function_call_delegation_0.out +++ b/src/test/regress/expected/multi_mx_function_call_delegation_0.out @@ -83,6 +83,16 @@ BEGIN y := x; x := (select case groupid when 0 then 'F' else 'S' end from pg_dist_local_group); END;$$; +-- function which internally uses COPY protocol without remote execution +CREATE FUNCTION mx_call_func_copy(x int) +RETURNS bool +LANGUAGE plpgsql AS $$ +BEGIN + INSERT INTO multi_mx_function_call_delegation.mx_call_dist_table_1 + SELECT s,s FROM generate_series(100, 110) s; + + RETURN true; +END;$$; -- Test that undistributed functions have no issue executing select multi_mx_function_call_delegation.mx_call_func(2, 0); mx_call_func @@ -96,6 +106,9 @@ select multi_mx_function_call_delegation.mx_call_func_custom_types('S', 'A'); (F,S) (1 row) +select multi_mx_function_call_delegation.mx_call_copy(2); +ERROR: function multi_mx_function_call_delegation.mx_call_copy(integer) does not exist +HINT: No function matches the given name and argument types. You might need to add explicit type casts. select squares(4); squares --------------------------------------------------------------------- @@ -131,6 +144,12 @@ select create_distributed_function('mx_call_func_custom_types(mx_call_enum,mx_ca (1 row) +select create_distributed_function('mx_call_func_copy(int)'); + create_distributed_function +--------------------------------------------------------------------- + +(1 row) + select create_distributed_function('squares(int)'); create_distributed_function --------------------------------------------------------------------- @@ -604,20 +623,6 @@ PL/pgSQL function mx_call_func(integer,integer) line XX at assignment 29 (1 row) -select mx_call_func(2, 0) from mx_call_dist_table_1; - mx_call_func ---------------------------------------------------------------------- - 28 - 28 - 28 - 28 - 28 - 28 - 28 - 28 - 28 -(9 rows) - select mx_call_func(2, 0) where mx_call_func(0, 2) = 0; DEBUG: generating subplan XXX_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_function_call_delegation.mx_call_dist_table_1 t1 JOIN multi_mx_function_call_delegation.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id))) CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)" @@ -647,6 +652,24 @@ PL/pgSQL function mx_call_func(integer,integer) line XX at assignment 29 | 27 (1 row) +-- we do not delegate the call, but do push down the query +-- that result in remote execution from workers +select mx_call_func(id, 0) from mx_call_dist_table_1; +ERROR: cannot execute a distributed query from a query on a shard +CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)" +PL/pgSQL function multi_mx_function_call_delegation.mx_call_func(integer,integer) line XX at assignment +while executing command on localhost:xxxxx +select mx_call_func(2, 0) from mx_call_dist_table_1 where id = 3; +ERROR: cannot execute a distributed query from a query on a shard +CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)" +PL/pgSQL function multi_mx_function_call_delegation.mx_call_func(integer,integer) line XX at assignment +while executing command on localhost:xxxxx +select mx_call_func_copy(2) from mx_call_dist_table_1 where id = 3; +ERROR: cannot execute a distributed query from a query on a shard +CONTEXT: SQL statement "INSERT INTO multi_mx_function_call_delegation.mx_call_dist_table_1 + SELECT s,s FROM generate_series(100, 110) s" +PL/pgSQL function multi_mx_function_call_delegation.mx_call_func_copy(integer) line XX at SQL statement +while executing command on localhost:xxxxx DO $$ BEGIN perform mx_call_func_tbl(40); END; $$; DEBUG: not pushing down function calls in a multi-statement transaction CONTEXT: SQL statement "SELECT mx_call_func_tbl(40)" @@ -725,7 +748,12 @@ HINT: Connect to the coordinator and run it again. -- show that functions can be delegated from worker nodes SET client_min_messages TO DEBUG1; SELECT mx_call_func(2, 0); -DEBUG: pushing down the function call +DEBUG: generating subplan XXX_1 for subquery SELECT sum((t1.val OPERATOR(pg_catalog.+) t2.val)) AS sum FROM (multi_mx_function_call_delegation.mx_call_dist_table_1 t1 JOIN multi_mx_function_call_delegation.mx_call_dist_table_2 t2 ON ((t1.id OPERATOR(pg_catalog.=) t2.id))) +CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)" +PL/pgSQL function mx_call_func(integer,integer) line XX at assignment +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT (2 OPERATOR(pg_catalog.+) (SELECT intermediate_result.sum FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(sum bigint))) +CONTEXT: SQL statement "SELECT y + (select sum(t1.val + t2.val) from multi_mx_function_call_delegation.mx_call_dist_table_1 t1 join multi_mx_function_call_delegation.mx_call_dist_table_2 t2 on t1.id = t2.id)" +PL/pgSQL function mx_call_func(integer,integer) line XX at assignment mx_call_func --------------------------------------------------------------------- 28 @@ -736,4 +764,4 @@ SET search_path TO multi_mx_function_call_delegation, public; RESET client_min_messages; \set VERBOSITY terse DROP SCHEMA multi_mx_function_call_delegation CASCADE; -NOTICE: drop cascades to 14 other objects +NOTICE: drop cascades to 15 other objects diff --git a/src/test/regress/sql/citus_local_table_triggers.sql b/src/test/regress/sql/citus_local_table_triggers.sql index d091c498b..76b192388 100644 --- a/src/test/regress/sql/citus_local_table_triggers.sql +++ b/src/test/regress/sql/citus_local_table_triggers.sql @@ -268,6 +268,15 @@ BEGIN END; $insert_100$ LANGUAGE plpgsql; +CREATE TABLE local_table (value int); + +CREATE FUNCTION insert_100_local() RETURNS trigger AS $insert_100$ +BEGIN + INSERT INTO local_table VALUES (100); + RETURN NEW; +END; +$insert_100$ LANGUAGE plpgsql; + BEGIN; CREATE TRIGGER insert_100_trigger AFTER TRUNCATE ON another_citus_local_table @@ -282,7 +291,7 @@ BEGIN; SELECT * FROM reference_table; ROLLBACK; - +-- cannot perform remote execution from a trigger on a Citus local table BEGIN; -- update should actually update something to test ON UPDATE CASCADE logic INSERT INTO another_citus_local_table VALUES (600); @@ -296,9 +305,47 @@ BEGIN; AFTER UPDATE ON citus_local_table FOR EACH STATEMENT EXECUTE FUNCTION insert_100(); + UPDATE another_citus_local_table SET value=value-1;; +ROLLBACK; + +-- can perform regular execution from a trigger on a Citus local table +BEGIN; + -- update should actually update something to test ON UPDATE CASCADE logic + INSERT INTO another_citus_local_table VALUES (600); + INSERT INTO citus_local_table VALUES (600); + + CREATE TRIGGER insert_100_trigger + AFTER UPDATE ON another_citus_local_table + FOR EACH STATEMENT EXECUTE FUNCTION insert_100_local(); + + CREATE TRIGGER insert_100_trigger + AFTER UPDATE ON citus_local_table + FOR EACH STATEMENT EXECUTE FUNCTION insert_100_local(); + UPDATE another_citus_local_table SET value=value-1;; -- we should see two rows with "100" - SELECT * FROM reference_table; + SELECT * FROM local_table; +ROLLBACK; + +-- can perform local execution from a trigger on a Citus local table +BEGIN; + SELECT citus_add_local_table_to_metadata('local_table'); + + -- update should actually update something to test ON UPDATE CASCADE logic + INSERT INTO another_citus_local_table VALUES (600); + INSERT INTO citus_local_table VALUES (600); + + CREATE TRIGGER insert_100_trigger + AFTER UPDATE ON another_citus_local_table + FOR EACH STATEMENT EXECUTE FUNCTION insert_100_local(); + + CREATE TRIGGER insert_100_trigger + AFTER UPDATE ON citus_local_table + FOR EACH STATEMENT EXECUTE FUNCTION insert_100_local(); + + UPDATE another_citus_local_table SET value=value-1;; + -- we should see two rows with "100" + SELECT * FROM local_table; ROLLBACK; -- test on partitioned citus local tables diff --git a/src/test/regress/sql/foreign_tables_mx.sql b/src/test/regress/sql/foreign_tables_mx.sql index a7c3138df..fdd391d10 100644 --- a/src/test/regress/sql/foreign_tables_mx.sql +++ b/src/test/regress/sql/foreign_tables_mx.sql @@ -65,17 +65,15 @@ ALTER FOREIGN TABLE public.foreign_table_newname VALIDATE CONSTRAINT check_c_2; ALTER FOREIGN TABLE public.foreign_table_newname DROP constraint IF EXISTS check_c_2; -- trigger test -CREATE TABLE distributed_table(value int); -SELECT create_distributed_table('distributed_table', 'value'); +CREATE TABLE table42(value int); CREATE FUNCTION insert_42() RETURNS trigger AS $insert_42$ BEGIN - INSERT INTO distributed_table VALUES (42); + INSERT INTO table42 VALUES (42); RETURN NEW; END; $insert_42$ LANGUAGE plpgsql; - CREATE TRIGGER insert_42_trigger AFTER DELETE ON public.foreign_table_newname FOR EACH ROW EXECUTE FUNCTION insert_42(); @@ -83,14 +81,14 @@ FOR EACH ROW EXECUTE FUNCTION insert_42(); -- do the same pattern from the workers as well INSERT INTO public.foreign_table_newname VALUES (99, 'test_2'); delete from public.foreign_table_newname where id_test = 99; -select * from distributed_table ORDER BY value; +select * from table42 ORDER BY value; -- disable trigger alter foreign table public.foreign_table_newname disable trigger insert_42_trigger; INSERT INTO public.foreign_table_newname VALUES (99, 'test_2'); delete from public.foreign_table_newname where id_test = 99; -- should not insert again as trigger disabled -select * from distributed_table ORDER BY value; +select * from table42 ORDER BY value; DROP TRIGGER insert_42_trigger ON public.foreign_table_newname; diff --git a/src/test/regress/sql/multi_mx_call.sql b/src/test/regress/sql/multi_mx_call.sql index 7df194ea7..4728b8948 100644 --- a/src/test/regress/sql/multi_mx_call.sql +++ b/src/test/regress/sql/multi_mx_call.sql @@ -100,9 +100,18 @@ BEGIN x := (select case groupid when 0 then 'F' else 'S' end from pg_dist_local_group); END;$$; +CREATE PROCEDURE mx_call_proc_copy(x int) +LANGUAGE plpgsql AS $$ +BEGIN + INSERT INTO multi_mx_call.mx_call_dist_table_1 + SELECT s,s FROM generate_series(100, 110) s; +END;$$; + + -- Test that undistributed procedures have no issue executing call multi_mx_call.mx_call_proc(2, 0); call multi_mx_call.mx_call_proc_custom_types('S', 'A'); +call multi_mx_call.mx_call_proc_copy(2); -- Same for unqualified names call mx_call_proc(2, 0); @@ -112,6 +121,7 @@ call mx_call_proc_custom_types('S', 'A'); select create_distributed_function('mx_call_proc(int,int)'); select create_distributed_function('mx_call_proc_bigint(bigint,bigint)'); select create_distributed_function('mx_call_proc_custom_types(mx_call_enum,mx_call_enum)'); +select create_distributed_function('mx_call_proc_copy(int)'); -- We still don't route them to the workers, because they aren't -- colocated with any distributed tables. @@ -119,16 +129,19 @@ SET client_min_messages TO DEBUG1; call multi_mx_call.mx_call_proc(2, 0); call mx_call_proc_bigint(4, 2); call multi_mx_call.mx_call_proc_custom_types('S', 'A'); +call multi_mx_call.mx_call_proc_copy(2); -- Mark them as colocated with a table. Now we should route them to workers. select colocate_proc_with_table('mx_call_proc', 'mx_call_dist_table_1'::regclass, 1); select colocate_proc_with_table('mx_call_proc_bigint', 'mx_call_dist_table_bigint'::regclass, 1); select colocate_proc_with_table('mx_call_proc_custom_types', 'mx_call_dist_table_enum'::regclass, 1); +select colocate_proc_with_table('mx_call_proc_copy', 'mx_call_dist_table_1'::regclass, 0); call multi_mx_call.mx_call_proc(2, 0); call multi_mx_call.mx_call_proc_custom_types('S', 'A'); call mx_call_proc(2, 0); call mx_call_proc_custom_types('S', 'A'); +call mx_call_proc_copy(2); -- Test implicit cast of int to bigint call mx_call_proc_bigint(4, 2); diff --git a/src/test/regress/sql/multi_mx_function_call_delegation.sql b/src/test/regress/sql/multi_mx_function_call_delegation.sql index b2d26e853..4dfe91322 100644 --- a/src/test/regress/sql/multi_mx_function_call_delegation.sql +++ b/src/test/regress/sql/multi_mx_function_call_delegation.sql @@ -67,9 +67,21 @@ BEGIN x := (select case groupid when 0 then 'F' else 'S' end from pg_dist_local_group); END;$$; +-- function which internally uses COPY protocol without remote execution +CREATE FUNCTION mx_call_func_copy(x int) +RETURNS bool +LANGUAGE plpgsql AS $$ +BEGIN + INSERT INTO multi_mx_function_call_delegation.mx_call_dist_table_1 + SELECT s,s FROM generate_series(100, 110) s; + + RETURN true; +END;$$; + -- Test that undistributed functions have no issue executing select multi_mx_function_call_delegation.mx_call_func(2, 0); select multi_mx_function_call_delegation.mx_call_func_custom_types('S', 'A'); +select multi_mx_function_call_delegation.mx_call_copy(2); select squares(4); -- Same for unqualified name @@ -79,6 +91,7 @@ select mx_call_func(2, 0); select create_distributed_function('mx_call_func(int,int)'); select create_distributed_function('mx_call_func_bigint(bigint,bigint)'); select create_distributed_function('mx_call_func_custom_types(mx_call_enum,mx_call_enum)'); +select create_distributed_function('mx_call_func_copy(int)'); select create_distributed_function('squares(int)'); @@ -249,10 +262,15 @@ select mx_call_func(floor(random())::int, 2); -- test forms we don't distribute select * from mx_call_func(2, 0); -select mx_call_func(2, 0) from mx_call_dist_table_1; select mx_call_func(2, 0) where mx_call_func(0, 2) = 0; select mx_call_func(2, 0), mx_call_func(0, 2); +-- we do not delegate the call, but do push down the query +-- that result in remote execution from workers +select mx_call_func(id, 0) from mx_call_dist_table_1; +select mx_call_func(2, 0) from mx_call_dist_table_1 where id = 3; +select mx_call_func_copy(2) from mx_call_dist_table_1 where id = 3; + DO $$ BEGIN perform mx_call_func_tbl(40); END; $$; SELECT * FROM mx_call_dist_table_1 WHERE id >= 40 ORDER BY id, val; From 3cc44ed8b3c4d26ab0642c2b810b502bfd55a27a Mon Sep 17 00:00:00 2001 From: Onur Tirtir Date: Mon, 10 Jan 2022 10:23:09 +0300 Subject: [PATCH 5/6] Tell other backends it's safe to ignore the backend that concurrently built the shell table index (#5520) In addition to starting a new transaction, we also need to tell other backends --including the ones spawned for connections opened to localhost to build indexes on shards of this relation-- that concurrent index builds can safely ignore us. Normally, DefineIndex() only does that if index doesn't have any predicates (i.e.: where clause) and no index expressions at all. However, now that we already called standard process utility, index build on the shell table is finished anyway. The reason behind doing so is that we cannot guarantee not grabbing any snapshots via adaptive executor, and the backends creating indexes on local shards (if any) might block on waiting for current xact of the current backend to finish, which would cause self deadlocks that are not detectable. --- .../distributed/commands/utility_hook.c | 86 +++++++++++++++++++ .../connection/connection_management.c | 6 ++ .../distributed/connection_management.h | 3 + 3 files changed, 95 insertions(+) diff --git a/src/backend/distributed/commands/utility_hook.c b/src/backend/distributed/commands/utility_hook.c index 3e2a2d24b..520d7f7fd 100644 --- a/src/backend/distributed/commands/utility_hook.c +++ b/src/backend/distributed/commands/utility_hook.c @@ -33,7 +33,9 @@ #include "access/attnum.h" #include "access/heapam.h" #include "access/htup_details.h" +#if PG_VERSION_NUM < 140000 #include "access/xact.h" +#endif #include "catalog/catalog.h" #include "catalog/dependency.h" #include "commands/dbcommands.h" @@ -52,7 +54,9 @@ #include "distributed/local_executor.h" #include "distributed/maintenanced.h" #include "distributed/multi_partitioning_utils.h" +#if PG_VERSION_NUM < 140000 #include "distributed/metadata_cache.h" +#endif #include "distributed/metadata_sync.h" #include "distributed/metadata/distobject.h" #include "distributed/multi_executor.h" @@ -91,6 +95,9 @@ static void ProcessUtilityInternal(PlannedStmt *pstmt, struct QueryEnvironment *queryEnv, DestReceiver *dest, QueryCompletionCompat *completionTag); +#if PG_VERSION_NUM >= 140000 +static void set_indexsafe_procflags(void); +#endif static char * SetSearchPathToCurrentSearchPathCommand(void); static char * CurrentSearchPath(void); static void IncrementUtilityHookCountersIfNecessary(Node *parsetree); @@ -1118,9 +1125,35 @@ ExecuteDistributedDDLJob(DDLJob *ddlJob) /* * Start a new transaction to make sure CONCURRENTLY commands * on localhost do not block waiting for this transaction to finish. + * + * In addition to doing that, we also need to tell other backends + * --including the ones spawned for connections opened to localhost to + * build indexes on shards of this relation-- that concurrent index + * builds can safely ignore us. + * + * Normally, DefineIndex() only does that if index doesn't have any + * predicates (i.e.: where clause) and no index expressions at all. + * However, now that we already called standard process utility, + * index build on the shell table is finished anyway. + * + * The reason behind doing so is that we cannot guarantee not + * grabbing any snapshots via adaptive executor, and the backends + * creating indexes on local shards (if any) might block on waiting + * for current xact of the current backend to finish, which would + * cause self deadlocks that are not detectable. */ if (ddlJob->startNewTransaction) { +#if PG_VERSION_NUM < 140000 + + /* + * Older versions of postgres doesn't have PROC_IN_SAFE_IC flag + * so we cannot use set_indexsafe_procflags in those versions. + * + * For this reason, we do our best to ensure not grabbing any + * snapshots later in the executor. + */ + /* * If cache is not populated, system catalog lookups will cause * the xmin of current backend to change. Then the last phase @@ -1141,8 +1174,34 @@ ExecuteDistributedDDLJob(DDLJob *ddlJob) * will already be in the hash table, hence we won't be holding any snapshots. */ WarmUpConnParamsHash(); +#endif + + /* + * Since it is not certain whether the code-path that we followed + * until reaching here caused grabbing any snapshots or not, we + * need to pop the active snapshot if we had any, to ensure not + * leaking any snapshots. + * + * For example, EnsureCoordinator might return without grabbing + * any snapshots if we didn't receive any invalidation messages + * but the otherwise is also possible. + */ + if (ActiveSnapshotSet()) + { + PopActiveSnapshot(); + } + CommitTransactionCommand(); StartTransactionCommand(); + +#if PG_VERSION_NUM >= 140000 + + /* + * Tell other backends to ignore us, even if we grab any + * snapshots via adaptive executor. + */ + set_indexsafe_procflags(); +#endif } MemoryContext savedContext = CurrentMemoryContext; @@ -1205,6 +1264,33 @@ ExecuteDistributedDDLJob(DDLJob *ddlJob) } +#if PG_VERSION_NUM >= 140000 + +/* + * set_indexsafe_procflags sets PROC_IN_SAFE_IC flag in MyProc->statusFlags. + * + * The flag is reset automatically at transaction end, so it must be set + * for each transaction. + * + * Copied from pg/src/backend/commands/indexcmds.c + * Also see pg commit c98763bf51bf610b3ee7e209fc76c3ff9a6b3163. + */ +static void +set_indexsafe_procflags(void) +{ + Assert(MyProc->xid == InvalidTransactionId && + MyProc->xmin == InvalidTransactionId); + + LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE); + MyProc->statusFlags |= PROC_IN_SAFE_IC; + ProcGlobal->statusFlags[MyProc->pgxactoff] = MyProc->statusFlags; + LWLockRelease(ProcArrayLock); +} + + +#endif + + /* * CreateCustomDDLTaskList creates a DDLJob which will apply a command to all placements * of shards of a distributed table. The command to be applied is generated by the diff --git a/src/backend/distributed/connection/connection_management.c b/src/backend/distributed/connection/connection_management.c index ce30e2b0f..9ca2cbb96 100644 --- a/src/backend/distributed/connection/connection_management.c +++ b/src/backend/distributed/connection/connection_management.c @@ -36,6 +36,7 @@ #include "distributed/version_compat.h" #include "distributed/worker_log_messages.h" #include "mb/pg_wchar.h" +#include "pg_config.h" #include "portability/instr_time.h" #include "storage/ipc.h" #include "utils/hsearch.h" @@ -1242,6 +1243,8 @@ StartConnectionEstablishment(MultiConnection *connection, ConnectionHashKey *key } +#if PG_VERSION_NUM < 140000 + /* * WarmUpConnParamsHash warms up the ConnParamsHash by loading all the * conn params for active primary nodes. @@ -1263,6 +1266,9 @@ WarmUpConnParamsHash(void) } +#endif + + /* * FindOrCreateConnParamsEntry searches ConnParamsHash for the given key, * if it is not found, it is created. diff --git a/src/include/distributed/connection_management.h b/src/include/distributed/connection_management.h index 68dde4fe3..aca9cee0f 100644 --- a/src/include/distributed/connection_management.h +++ b/src/include/distributed/connection_management.h @@ -16,6 +16,7 @@ #include "distributed/transaction_management.h" #include "distributed/remote_transaction.h" #include "lib/ilist.h" +#include "pg_config.h" #include "portability/instr_time.h" #include "utils/guc.h" #include "utils/hsearch.h" @@ -283,5 +284,7 @@ extern void MarkConnectionConnected(MultiConnection *connection); extern double MillisecondsPassedSince(instr_time moment); extern long MillisecondsToTimeout(instr_time start, long msAfterStart); +#if PG_VERSION_NUM < 140000 extern void WarmUpConnParamsHash(void); +#endif #endif /* CONNECTION_MANAGMENT_H */ From 885601c02caed52c14683b3eba5fed01111428d6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C3=96nder=20Kalac=C4=B1?= Date: Mon, 10 Jan 2022 17:30:13 +0100 Subject: [PATCH 6/6] Require superuser while activating a node (#5609) * Require superuser while activating a node With this change, we require ActiveNode() (hence citus_add_node(), citus_activate_node()) explicitly require for a superuser. Before this commit, these functions were designed to work with non-superuser roles with the relevent GRANTs given. However, that is not a widely used way for calling the functions above. Due to possibility of non-super user calling the UDFs, they were designed in a way that some commands were using some additional short-lived superuser connections. That is: (a) breaking transactional behavior (e.g., ROLLBACK wouldn't fully rollback the whole transaction) (b) Making it very complicated to reason about which parts of the node activation goes over which connections, and becoming vulnerable to deadlocks / visibility issues. --- .../distributed/commands/dependencies.c | 8 ++++-- .../distributed/metadata/node_metadata.c | 24 ++++++++++++---- .../distributed/utils/reference_table_utils.c | 28 +++++++++++-------- .../expected/multi_cluster_management.out | 27 ++++-------------- .../regress/sql/multi_cluster_management.sql | 5 ++-- 5 files changed, 49 insertions(+), 43 deletions(-) diff --git a/src/backend/distributed/commands/dependencies.c b/src/backend/distributed/commands/dependencies.c index a8fa5c118..19fc94646 100644 --- a/src/backend/distributed/commands/dependencies.c +++ b/src/backend/distributed/commands/dependencies.c @@ -368,8 +368,12 @@ ReplicateAllDependenciesToNode(const char *nodeName, int nodePort) /* since we are executing ddl commands lets disable propagation, primarily for mx */ ddlCommands = list_concat(list_make1(DISABLE_DDL_PROPAGATION), ddlCommands); - SendCommandListToWorkerOutsideTransaction(nodeName, nodePort, - CitusExtensionOwnerName(), ddlCommands); + /* send commands to new workers, the current user should a superuser */ + Assert(superuser()); + SendMetadataCommandListToWorkerInCoordinatedTransaction(nodeName, + nodePort, + CurrentUserName(), + ddlCommands); } diff --git a/src/backend/distributed/metadata/node_metadata.c b/src/backend/distributed/metadata/node_metadata.c index bd9fb55d9..63ac316a8 100644 --- a/src/backend/distributed/metadata/node_metadata.c +++ b/src/backend/distributed/metadata/node_metadata.c @@ -641,11 +641,12 @@ PropagateNodeWideObjects(WorkerNode *newWorkerNode) ddlCommands = lcons(DISABLE_DDL_PROPAGATION, ddlCommands); ddlCommands = lappend(ddlCommands, ENABLE_DDL_PROPAGATION); - /* send commands to new workers*/ - SendCommandListToWorkerOutsideTransaction(newWorkerNode->workerName, - newWorkerNode->workerPort, - CitusExtensionOwnerName(), - ddlCommands); + /* send commands to new workers, the current user should be a superuser */ + Assert(superuser()); + SendMetadataCommandListToWorkerInCoordinatedTransaction(newWorkerNode->workerName, + newWorkerNode->workerPort, + CurrentUserName(), + ddlCommands); } } @@ -851,6 +852,19 @@ ActivateNode(char *nodeName, int nodePort) { bool isActive = true; + /* + * We currently require the object propagation to happen via superuser, + * see #5139. While activating a node, we sync both metadata and object + * propagation. + * + * In order to have a fully transactional semantics with add/activate + * node operations, we require superuser. Note that for creating + * non-owned objects, we already require a superuser connection. + * By ensuring the current user to be a superuser, we can guarantee + * to send all commands within the same remote transaction. + */ + EnsureSuperUser(); + /* take an exclusive lock on pg_dist_node to serialize pg_dist_node changes */ LockRelationOid(DistNodeRelationId(), ExclusiveLock); diff --git a/src/backend/distributed/utils/reference_table_utils.c b/src/backend/distributed/utils/reference_table_utils.c index c20e38034..50ee92d0e 100644 --- a/src/backend/distributed/utils/reference_table_utils.c +++ b/src/backend/distributed/utils/reference_table_utils.c @@ -45,8 +45,9 @@ static StringInfo CopyShardPlacementToWorkerNodeQuery( ShardPlacement *sourceShardPlacement, WorkerNode *workerNode, char transferMode); -static void ReplicateShardToNode(ShardInterval *shardInterval, char *nodeName, - int nodePort); +static void ReplicateReferenceTableShardToNode(ShardInterval *shardInterval, + char *nodeName, + int nodePort); static bool AnyRelationsModifiedInTransaction(List *relationIdList); static List * ReplicatedMetadataSyncedDistributedTableList(void); @@ -336,7 +337,8 @@ upgrade_to_reference_table(PG_FUNCTION_ARGS) * table. */ static void -ReplicateShardToNode(ShardInterval *shardInterval, char *nodeName, int nodePort) +ReplicateReferenceTableShardToNode(ShardInterval *shardInterval, char *nodeName, + int nodePort) { uint64 shardId = shardInterval->shardId; @@ -351,8 +353,6 @@ ReplicateShardToNode(ShardInterval *shardInterval, char *nodeName, int nodePort) List *shardPlacementList = ShardPlacementListIncludingOrphanedPlacements(shardId); ShardPlacement *targetPlacement = SearchShardPlacementInList(shardPlacementList, nodeName, nodePort); - char *tableOwner = TableOwner(shardInterval->relationId); - if (targetPlacement != NULL) { if (targetPlacement->shardState == SHARD_STATE_ACTIVE) @@ -370,9 +370,11 @@ ReplicateShardToNode(ShardInterval *shardInterval, char *nodeName, int nodePort) get_rel_name(shardInterval->relationId), nodeName, nodePort))); - EnsureNoModificationsHaveBeenDone(); - SendCommandListToWorkerOutsideTransaction(nodeName, nodePort, tableOwner, - ddlCommandList); + /* send commands to new workers, the current user should be a superuser */ + Assert(superuser()); + SendMetadataCommandListToWorkerInCoordinatedTransaction(nodeName, nodePort, + CurrentUserName(), + ddlCommandList); int32 groupId = GroupForNode(nodeName, nodePort); uint64 placementId = GetNextPlacementId(); @@ -587,17 +589,19 @@ ReplicateAllReferenceTablesToNode(char *nodeName, int nodePort) LockShardDistributionMetadata(shardId, ExclusiveLock); - ReplicateShardToNode(shardInterval, nodeName, nodePort); + ReplicateReferenceTableShardToNode(shardInterval, nodeName, nodePort); } /* create foreign constraints between reference tables */ foreach_ptr(shardInterval, referenceShardIntervalList) { - char *tableOwner = TableOwner(shardInterval->relationId); List *commandList = CopyShardForeignConstraintCommandList(shardInterval); - SendCommandListToWorkerOutsideTransaction(nodeName, nodePort, tableOwner, - commandList); + /* send commands to new workers, the current user should be a superuser */ + Assert(superuser()); + SendMetadataCommandListToWorkerInCoordinatedTransaction(nodeName, nodePort, + CurrentUserName(), + commandList); } } } diff --git a/src/test/regress/expected/multi_cluster_management.out b/src/test/regress/expected/multi_cluster_management.out index b851d6909..5d3d9f26e 100644 --- a/src/test/regress/expected/multi_cluster_management.out +++ b/src/test/regress/expected/multi_cluster_management.out @@ -270,6 +270,9 @@ ERROR: permission denied for function master_update_node -- try to manipulate node metadata via privileged user SET ROLE node_metadata_user; SET citus.enable_object_propagation TO off; -- prevent master activate node to actually connect for this test +SELECT 1 FROM master_add_node('localhost', :worker_2_port); +ERROR: operation is not allowed +HINT: Run the command with a superuser. BEGIN; SELECT 1 FROM master_add_inactive_node('localhost', :worker_2_port); ?column? @@ -277,29 +280,13 @@ SELECT 1 FROM master_add_inactive_node('localhost', :worker_2_port); 1 (1 row) -SELECT 1 FROM master_activate_node('localhost', :worker_2_port); -WARNING: citus.enable_object_propagation is off, not creating distributed objects on worker -DETAIL: distributed objects are only kept in sync when citus.enable_object_propagation is set to on. Newly activated nodes will not get these objects created - ?column? ---------------------------------------------------------------------- - 1 -(1 row) - SELECT 1 FROM master_remove_node('localhost', :worker_2_port); ?column? --------------------------------------------------------------------- 1 (1 row) -SELECT 1 FROM master_add_node('localhost', :worker_2_port); -WARNING: citus.enable_object_propagation is off, not creating distributed objects on worker -DETAIL: distributed objects are only kept in sync when citus.enable_object_propagation is set to on. Newly activated nodes will not get these objects created - ?column? ---------------------------------------------------------------------- - 1 -(1 row) - -SELECT 1 FROM master_add_secondary_node('localhost', :worker_2_port + 2, 'localhost', :worker_2_port); +SELECT 1 FROM master_add_secondary_node('localhost', :worker_2_port + 2, 'localhost', :worker_1_port); ?column? --------------------------------------------------------------------- 1 @@ -308,16 +295,14 @@ SELECT 1 FROM master_add_secondary_node('localhost', :worker_2_port + 2, 'localh SELECT master_update_node(nodeid, 'localhost', :worker_2_port + 3) FROM pg_dist_node WHERE nodeport = :worker_2_port; master_update_node --------------------------------------------------------------------- - -(1 row) +(0 rows) SELECT nodename, nodeport, noderole FROM pg_dist_node ORDER BY nodeport; nodename | nodeport | noderole --------------------------------------------------------------------- localhost | 57637 | primary localhost | 57640 | secondary - localhost | 57641 | primary -(3 rows) +(2 rows) ABORT; \c - postgres - :master_port diff --git a/src/test/regress/sql/multi_cluster_management.sql b/src/test/regress/sql/multi_cluster_management.sql index 20f0c8f16..df0a8e389 100644 --- a/src/test/regress/sql/multi_cluster_management.sql +++ b/src/test/regress/sql/multi_cluster_management.sql @@ -126,12 +126,11 @@ SELECT master_update_node(nodeid, 'localhost', :worker_2_port + 3) FROM pg_dist_ -- try to manipulate node metadata via privileged user SET ROLE node_metadata_user; SET citus.enable_object_propagation TO off; -- prevent master activate node to actually connect for this test +SELECT 1 FROM master_add_node('localhost', :worker_2_port); BEGIN; SELECT 1 FROM master_add_inactive_node('localhost', :worker_2_port); -SELECT 1 FROM master_activate_node('localhost', :worker_2_port); SELECT 1 FROM master_remove_node('localhost', :worker_2_port); -SELECT 1 FROM master_add_node('localhost', :worker_2_port); -SELECT 1 FROM master_add_secondary_node('localhost', :worker_2_port + 2, 'localhost', :worker_2_port); +SELECT 1 FROM master_add_secondary_node('localhost', :worker_2_port + 2, 'localhost', :worker_1_port); SELECT master_update_node(nodeid, 'localhost', :worker_2_port + 3) FROM pg_dist_node WHERE nodeport = :worker_2_port; SELECT nodename, nodeport, noderole FROM pg_dist_node ORDER BY nodeport; ABORT;