PR #6728  / commit - 3

Let nontransactional sync mode create transaction per shell table during dropping the shell tables from worker.
pull/6728/head
aykutbozkurt 2023-03-21 00:46:01 +03:00
parent 85d50203d1
commit 8feb8c634a
8 changed files with 76 additions and 4 deletions

View File

@ -716,11 +716,12 @@ DropMetadataSnapshotOnNode(WorkerNode *workerNode)
* Detach partitions, break dependencies between sequences and table then
* remove shell tables first.
*/
bool singleTransaction = true;
List *dropMetadataCommandList = DetachPartitionCommandList();
dropMetadataCommandList = lappend(dropMetadataCommandList,
BREAK_CITUS_TABLE_SEQUENCE_DEPENDENCY_COMMAND);
dropMetadataCommandList = lappend(dropMetadataCommandList,
REMOVE_ALL_SHELL_TABLES_COMMAND);
WorkerDropAllShellTablesCommand(singleTransaction));
dropMetadataCommandList = list_concat(dropMetadataCommandList,
NodeMetadataDropCommands());
dropMetadataCommandList = lappend(dropMetadataCommandList,
@ -4485,3 +4486,19 @@ SendOrCollectCommandListToSingleNode(MetadataSyncContext *context, List *command
pg_unreachable();
}
}
/*
* WorkerDropAllShellTablesCommand returns command required to drop shell tables
* from workers. When singleTransaction is false, we create transaction per shell
* table. Otherwise, we drop all shell tables within single transaction.
*/
char *
WorkerDropAllShellTablesCommand(bool singleTransaction)
{
char *singleTransactionString = (singleTransaction) ? "true" : "false";
StringInfo removeAllShellTablesCommand = makeStringInfo();
appendStringInfo(removeAllShellTablesCommand, WORKER_DROP_ALL_SHELL_TABLES,
singleTransactionString);
return removeAllShellTablesCommand->data;
}

View File

@ -7,3 +7,5 @@ ALTER TABLE pg_catalog.pg_dist_placement REPLICA IDENTITY USING INDEX pg_dist_pl
ALTER TABLE pg_catalog.pg_dist_rebalance_strategy REPLICA IDENTITY USING INDEX pg_dist_rebalance_strategy_name_key;
ALTER TABLE pg_catalog.pg_dist_shard REPLICA IDENTITY USING INDEX pg_dist_shard_shardid_index;
ALTER TABLE pg_catalog.pg_dist_transaction REPLICA IDENTITY USING INDEX pg_dist_transaction_unique_constraint;
#include "udfs/worker_drop_all_shell_tables/11.3-1.sql"

View File

@ -17,3 +17,5 @@ ALTER TABLE pg_catalog.pg_dist_placement REPLICA IDENTITY NOTHING;
ALTER TABLE pg_catalog.pg_dist_rebalance_strategy REPLICA IDENTITY NOTHING;
ALTER TABLE pg_catalog.pg_dist_shard REPLICA IDENTITY NOTHING;
ALTER TABLE pg_catalog.pg_dist_transaction REPLICA IDENTITY NOTHING;
DROP PROCEDURE pg_catalog.worker_drop_all_shell_tables(bool);

View File

@ -0,0 +1,23 @@
-- During metadata sync, when we send many ddls over single transaction, worker node can error due
-- to reaching at max allocation block size for invalidation messages. To find a workaround for the problem,
-- we added nontransactional metadata sync mode where we create many transaction while dropping shell tables
-- via https://github.com/citusdata/citus/pull/6728.
CREATE OR REPLACE PROCEDURE pg_catalog.worker_drop_all_shell_tables(singleTransaction bool DEFAULT true)
LANGUAGE plpgsql
AS $$
DECLARE
table_name text;
BEGIN
-- drop shell tables within single or multiple transactions according to the flag singleTransaction
FOR table_name IN SELECT logicalrelid::regclass::text FROM pg_dist_partition
LOOP
PERFORM pg_catalog.worker_drop_shell_table(table_name);
IF not singleTransaction THEN
COMMIT;
END IF;
END LOOP;
END;
$$;
COMMENT ON PROCEDURE worker_drop_all_shell_tables(singleTransaction bool)
IS 'drop all distributed tables only without the metadata within single transaction or '
'multiple transaction specified by the flag singleTransaction';

View File

@ -0,0 +1,23 @@
-- During metadata sync, when we send many ddls over single transaction, worker node can error due
-- to reaching at max allocation block size for invalidation messages. To find a workaround for the problem,
-- we added nontransactional metadata sync mode where we create many transaction while dropping shell tables
-- via https://github.com/citusdata/citus/pull/6728.
CREATE OR REPLACE PROCEDURE pg_catalog.worker_drop_all_shell_tables(singleTransaction bool DEFAULT true)
LANGUAGE plpgsql
AS $$
DECLARE
table_name text;
BEGIN
-- drop shell tables within single or multiple transactions according to the flag singleTransaction
FOR table_name IN SELECT logicalrelid::regclass::text FROM pg_dist_partition
LOOP
PERFORM pg_catalog.worker_drop_shell_table(table_name);
IF not singleTransaction THEN
COMMIT;
END IF;
END LOOP;
END;
$$;
COMMENT ON PROCEDURE worker_drop_all_shell_tables(singleTransaction bool)
IS 'drop all distributed tables only without the metadata within single transaction or '
'multiple transaction specified by the flag singleTransaction';

View File

@ -153,14 +153,17 @@ extern void SendOrCollectCommandListToMetadataNodes(MetadataSyncContext *context
extern void SendOrCollectCommandListToSingleNode(MetadataSyncContext *context,
List *commands, int nodeIdx);
extern char * WorkerDropAllShellTablesCommand(bool singleTransaction);
#define DELETE_ALL_NODES "DELETE FROM pg_dist_node"
#define DELETE_ALL_PLACEMENTS "DELETE FROM pg_dist_placement"
#define DELETE_ALL_SHARDS "DELETE FROM pg_dist_shard"
#define DELETE_ALL_DISTRIBUTED_OBJECTS "DELETE FROM pg_catalog.pg_dist_object"
#define DELETE_ALL_PARTITIONS "DELETE FROM pg_dist_partition"
#define DELETE_ALL_COLOCATION "DELETE FROM pg_catalog.pg_dist_colocation"
#define REMOVE_ALL_SHELL_TABLES_COMMAND \
"SELECT worker_drop_shell_table(logicalrelid::regclass::text) FROM pg_dist_partition"
#define WORKER_DROP_ALL_SHELL_TABLES \
"CALL pg_catalog.worker_drop_all_shell_tables(%s)"
#define REMOVE_ALL_CITUS_TABLES_COMMAND \
"SELECT worker_drop_distributed_table(logicalrelid::regclass::text) FROM pg_dist_partition"
#define BREAK_CITUS_TABLE_SEQUENCE_DEPENDENCY_COMMAND \

View File

@ -1366,7 +1366,8 @@ SELECT * FROM multi_extension.print_extension_changes();
| function citus_internal_start_replication_origin_tracking() void
| function citus_internal_stop_replication_origin_tracking() void
| function worker_adjust_identity_column_seq_ranges(regclass) void
(4 rows)
| function worker_drop_all_shell_tables(boolean)
(5 rows)
DROP TABLE multi_extension.prev_objects, multi_extension.extension_diff;
-- show running version

View File

@ -245,6 +245,7 @@ ORDER BY 1;
function worker_create_or_replace_object(text)
function worker_create_or_replace_object(text[])
function worker_create_truncate_trigger(regclass)
function worker_drop_all_shell_tables(boolean)
function worker_drop_distributed_table(text)
function worker_drop_sequence_dependency(text)
function worker_drop_shell_table(text)