From 8feb8c634a34055ef4a6cdac94275202e0f90ecf Mon Sep 17 00:00:00 2001 From: aykutbozkurt Date: Tue, 21 Mar 2023 00:46:01 +0300 Subject: [PATCH] =?UTF-8?q?PR=20#6728=20=C2=A0/=20commit=20-=203?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Let nontransactional sync mode create transaction per shell table during dropping the shell tables from worker. --- .../distributed/metadata/metadata_sync.c | 19 ++++++++++++++- .../distributed/sql/citus--11.2-1--11.3-1.sql | 2 ++ .../sql/downgrades/citus--11.3-1--11.2-1.sql | 2 ++ .../worker_drop_all_shell_tables/11.3-1.sql | 23 +++++++++++++++++++ .../worker_drop_all_shell_tables/latest.sql | 23 +++++++++++++++++++ src/include/distributed/metadata_sync.h | 7 ++++-- src/test/regress/expected/multi_extension.out | 3 ++- .../expected/upgrade_list_citus_objects.out | 1 + 8 files changed, 76 insertions(+), 4 deletions(-) create mode 100644 src/backend/distributed/sql/udfs/worker_drop_all_shell_tables/11.3-1.sql create mode 100644 src/backend/distributed/sql/udfs/worker_drop_all_shell_tables/latest.sql diff --git a/src/backend/distributed/metadata/metadata_sync.c b/src/backend/distributed/metadata/metadata_sync.c index c210bd914..78ba24e97 100644 --- a/src/backend/distributed/metadata/metadata_sync.c +++ b/src/backend/distributed/metadata/metadata_sync.c @@ -716,11 +716,12 @@ DropMetadataSnapshotOnNode(WorkerNode *workerNode) * Detach partitions, break dependencies between sequences and table then * remove shell tables first. */ + bool singleTransaction = true; List *dropMetadataCommandList = DetachPartitionCommandList(); dropMetadataCommandList = lappend(dropMetadataCommandList, BREAK_CITUS_TABLE_SEQUENCE_DEPENDENCY_COMMAND); dropMetadataCommandList = lappend(dropMetadataCommandList, - REMOVE_ALL_SHELL_TABLES_COMMAND); + WorkerDropAllShellTablesCommand(singleTransaction)); dropMetadataCommandList = list_concat(dropMetadataCommandList, NodeMetadataDropCommands()); dropMetadataCommandList = lappend(dropMetadataCommandList, @@ -4485,3 +4486,19 @@ SendOrCollectCommandListToSingleNode(MetadataSyncContext *context, List *command pg_unreachable(); } } + + +/* + * WorkerDropAllShellTablesCommand returns command required to drop shell tables + * from workers. When singleTransaction is false, we create transaction per shell + * table. Otherwise, we drop all shell tables within single transaction. + */ +char * +WorkerDropAllShellTablesCommand(bool singleTransaction) +{ + char *singleTransactionString = (singleTransaction) ? "true" : "false"; + StringInfo removeAllShellTablesCommand = makeStringInfo(); + appendStringInfo(removeAllShellTablesCommand, WORKER_DROP_ALL_SHELL_TABLES, + singleTransactionString); + return removeAllShellTablesCommand->data; +} diff --git a/src/backend/distributed/sql/citus--11.2-1--11.3-1.sql b/src/backend/distributed/sql/citus--11.2-1--11.3-1.sql index c14904a94..a6f7d7725 100644 --- a/src/backend/distributed/sql/citus--11.2-1--11.3-1.sql +++ b/src/backend/distributed/sql/citus--11.2-1--11.3-1.sql @@ -7,3 +7,5 @@ ALTER TABLE pg_catalog.pg_dist_placement REPLICA IDENTITY USING INDEX pg_dist_pl ALTER TABLE pg_catalog.pg_dist_rebalance_strategy REPLICA IDENTITY USING INDEX pg_dist_rebalance_strategy_name_key; ALTER TABLE pg_catalog.pg_dist_shard REPLICA IDENTITY USING INDEX pg_dist_shard_shardid_index; ALTER TABLE pg_catalog.pg_dist_transaction REPLICA IDENTITY USING INDEX pg_dist_transaction_unique_constraint; + +#include "udfs/worker_drop_all_shell_tables/11.3-1.sql" diff --git a/src/backend/distributed/sql/downgrades/citus--11.3-1--11.2-1.sql b/src/backend/distributed/sql/downgrades/citus--11.3-1--11.2-1.sql index c9fe75d1a..e89e54ab5 100644 --- a/src/backend/distributed/sql/downgrades/citus--11.3-1--11.2-1.sql +++ b/src/backend/distributed/sql/downgrades/citus--11.3-1--11.2-1.sql @@ -17,3 +17,5 @@ ALTER TABLE pg_catalog.pg_dist_placement REPLICA IDENTITY NOTHING; ALTER TABLE pg_catalog.pg_dist_rebalance_strategy REPLICA IDENTITY NOTHING; ALTER TABLE pg_catalog.pg_dist_shard REPLICA IDENTITY NOTHING; ALTER TABLE pg_catalog.pg_dist_transaction REPLICA IDENTITY NOTHING; + +DROP PROCEDURE pg_catalog.worker_drop_all_shell_tables(bool); diff --git a/src/backend/distributed/sql/udfs/worker_drop_all_shell_tables/11.3-1.sql b/src/backend/distributed/sql/udfs/worker_drop_all_shell_tables/11.3-1.sql new file mode 100644 index 000000000..55236286c --- /dev/null +++ b/src/backend/distributed/sql/udfs/worker_drop_all_shell_tables/11.3-1.sql @@ -0,0 +1,23 @@ + -- During metadata sync, when we send many ddls over single transaction, worker node can error due +-- to reaching at max allocation block size for invalidation messages. To find a workaround for the problem, +-- we added nontransactional metadata sync mode where we create many transaction while dropping shell tables +-- via https://github.com/citusdata/citus/pull/6728. +CREATE OR REPLACE PROCEDURE pg_catalog.worker_drop_all_shell_tables(singleTransaction bool DEFAULT true) +LANGUAGE plpgsql +AS $$ +DECLARE + table_name text; +BEGIN + -- drop shell tables within single or multiple transactions according to the flag singleTransaction + FOR table_name IN SELECT logicalrelid::regclass::text FROM pg_dist_partition + LOOP + PERFORM pg_catalog.worker_drop_shell_table(table_name); + IF not singleTransaction THEN + COMMIT; + END IF; + END LOOP; +END; +$$; +COMMENT ON PROCEDURE worker_drop_all_shell_tables(singleTransaction bool) + IS 'drop all distributed tables only without the metadata within single transaction or ' + 'multiple transaction specified by the flag singleTransaction'; diff --git a/src/backend/distributed/sql/udfs/worker_drop_all_shell_tables/latest.sql b/src/backend/distributed/sql/udfs/worker_drop_all_shell_tables/latest.sql new file mode 100644 index 000000000..55236286c --- /dev/null +++ b/src/backend/distributed/sql/udfs/worker_drop_all_shell_tables/latest.sql @@ -0,0 +1,23 @@ + -- During metadata sync, when we send many ddls over single transaction, worker node can error due +-- to reaching at max allocation block size for invalidation messages. To find a workaround for the problem, +-- we added nontransactional metadata sync mode where we create many transaction while dropping shell tables +-- via https://github.com/citusdata/citus/pull/6728. +CREATE OR REPLACE PROCEDURE pg_catalog.worker_drop_all_shell_tables(singleTransaction bool DEFAULT true) +LANGUAGE plpgsql +AS $$ +DECLARE + table_name text; +BEGIN + -- drop shell tables within single or multiple transactions according to the flag singleTransaction + FOR table_name IN SELECT logicalrelid::regclass::text FROM pg_dist_partition + LOOP + PERFORM pg_catalog.worker_drop_shell_table(table_name); + IF not singleTransaction THEN + COMMIT; + END IF; + END LOOP; +END; +$$; +COMMENT ON PROCEDURE worker_drop_all_shell_tables(singleTransaction bool) + IS 'drop all distributed tables only without the metadata within single transaction or ' + 'multiple transaction specified by the flag singleTransaction'; diff --git a/src/include/distributed/metadata_sync.h b/src/include/distributed/metadata_sync.h index 89a859a7f..608dbbacc 100644 --- a/src/include/distributed/metadata_sync.h +++ b/src/include/distributed/metadata_sync.h @@ -153,14 +153,17 @@ extern void SendOrCollectCommandListToMetadataNodes(MetadataSyncContext *context extern void SendOrCollectCommandListToSingleNode(MetadataSyncContext *context, List *commands, int nodeIdx); +extern char * WorkerDropAllShellTablesCommand(bool singleTransaction); + #define DELETE_ALL_NODES "DELETE FROM pg_dist_node" #define DELETE_ALL_PLACEMENTS "DELETE FROM pg_dist_placement" #define DELETE_ALL_SHARDS "DELETE FROM pg_dist_shard" #define DELETE_ALL_DISTRIBUTED_OBJECTS "DELETE FROM pg_catalog.pg_dist_object" #define DELETE_ALL_PARTITIONS "DELETE FROM pg_dist_partition" #define DELETE_ALL_COLOCATION "DELETE FROM pg_catalog.pg_dist_colocation" -#define REMOVE_ALL_SHELL_TABLES_COMMAND \ - "SELECT worker_drop_shell_table(logicalrelid::regclass::text) FROM pg_dist_partition" +#define WORKER_DROP_ALL_SHELL_TABLES \ + "CALL pg_catalog.worker_drop_all_shell_tables(%s)" + #define REMOVE_ALL_CITUS_TABLES_COMMAND \ "SELECT worker_drop_distributed_table(logicalrelid::regclass::text) FROM pg_dist_partition" #define BREAK_CITUS_TABLE_SEQUENCE_DEPENDENCY_COMMAND \ diff --git a/src/test/regress/expected/multi_extension.out b/src/test/regress/expected/multi_extension.out index 2b9b70dfa..02021acd4 100644 --- a/src/test/regress/expected/multi_extension.out +++ b/src/test/regress/expected/multi_extension.out @@ -1366,7 +1366,8 @@ SELECT * FROM multi_extension.print_extension_changes(); | function citus_internal_start_replication_origin_tracking() void | function citus_internal_stop_replication_origin_tracking() void | function worker_adjust_identity_column_seq_ranges(regclass) void -(4 rows) + | function worker_drop_all_shell_tables(boolean) +(5 rows) DROP TABLE multi_extension.prev_objects, multi_extension.extension_diff; -- show running version diff --git a/src/test/regress/expected/upgrade_list_citus_objects.out b/src/test/regress/expected/upgrade_list_citus_objects.out index 857a749a3..92b91ccfc 100644 --- a/src/test/regress/expected/upgrade_list_citus_objects.out +++ b/src/test/regress/expected/upgrade_list_citus_objects.out @@ -245,6 +245,7 @@ ORDER BY 1; function worker_create_or_replace_object(text) function worker_create_or_replace_object(text[]) function worker_create_truncate_trigger(regclass) + function worker_drop_all_shell_tables(boolean) function worker_drop_distributed_table(text) function worker_drop_sequence_dependency(text) function worker_drop_shell_table(text)