From eeb8431a5f2a5fe861999f8b14a34ca4b4ca4ca9 Mon Sep 17 00:00:00 2001 From: Onder Kalaci Date: Thu, 1 Dec 2022 09:31:34 +0100 Subject: [PATCH] Make drop metadata commands non-transactional Dropping the whole metadata inside a single transaction can cause lots of relcache / catcache invalidation on Postgres. That's why we run each command in a seperate transaction, which is NOT the coordinated transaction. Note that worker_drop_shell_table() call could still cause excessive invalidations in certain cases. However, as per our experiments show, this code is fine to drop up to 1M distributed tables inside a single transaction. TODO: This commit does not handle with multiple start_metadata_sync_to_node commands. --- .../distributed/metadata/metadata_sync.c | 3 - .../distributed/metadata/node_metadata.c | 121 +++++++++++++++--- src/backend/distributed/test/metadata_sync.c | 12 +- src/include/distributed/worker_manager.h | 1 + .../expected/failure_mx_metadata_sync.out | 2 +- .../expected/multi_mx_node_metadata.out | 6 - .../expected/start_stop_metadata_sync.out | 63 +++------ .../regress/sql/multi_mx_node_metadata.sql | 1 - .../regress/sql/start_stop_metadata_sync.sql | 32 ++--- 9 files changed, 143 insertions(+), 98 deletions(-) diff --git a/src/backend/distributed/metadata/metadata_sync.c b/src/backend/distributed/metadata/metadata_sync.c index be8b633f9..e1e8d6b96 100644 --- a/src/backend/distributed/metadata/metadata_sync.c +++ b/src/backend/distributed/metadata/metadata_sync.c @@ -258,9 +258,6 @@ SyncNodeMetadataToNode(const char *nodeNameString, int32 nodePort) if (NodeIsCoordinator(workerNode)) { - ereport(NOTICE, (errmsg("%s:%d is the coordinator and already contains " - "metadata, skipping syncing the metadata", - nodeNameString, nodePort))); return; } diff --git a/src/backend/distributed/metadata/node_metadata.c b/src/backend/distributed/metadata/node_metadata.c index 6e7e6f469..cffad0d8a 100644 --- a/src/backend/distributed/metadata/node_metadata.c +++ b/src/backend/distributed/metadata/node_metadata.c @@ -109,6 +109,7 @@ static WorkerNode * TupleToWorkerNode(TupleDesc tupleDescriptor, HeapTuple heapT static List * PropagateNodeWideObjectsCommandList(); static WorkerNode * ModifiableWorkerNode(const char *nodeName, int32 nodePort); static bool NodeIsLocal(WorkerNode *worker); +static void DropExistingMetadataInOutsideTransaction(List *nodeList); static void SetLockTimeoutLocally(int32 lock_cooldown); static void UpdateNodeLocation(int32 nodeId, char *newNodeName, int32 newNodePort); static bool UnsetMetadataSyncedForAllWorkers(void); @@ -715,17 +716,6 @@ PgDistTableMetadataSyncCommandList(void) } } - /* remove all dist table and object related metadata first */ - metadataSnapshotCommandList = lappend(metadataSnapshotCommandList, - DELETE_ALL_PARTITIONS); - metadataSnapshotCommandList = lappend(metadataSnapshotCommandList, DELETE_ALL_SHARDS); - metadataSnapshotCommandList = lappend(metadataSnapshotCommandList, - DELETE_ALL_PLACEMENTS); - metadataSnapshotCommandList = lappend(metadataSnapshotCommandList, - DELETE_ALL_DISTRIBUTED_OBJECTS); - metadataSnapshotCommandList = lappend(metadataSnapshotCommandList, - DELETE_ALL_COLOCATION); - /* create pg_dist_partition, pg_dist_shard and pg_dist_placement entries */ foreach_ptr(cacheEntry, propagatedTableList) { @@ -811,15 +801,6 @@ SyncDistributedObjectsCommandList(WorkerNode *workerNode) */ commandList = list_concat(commandList, PropagateNodeWideObjectsCommandList()); - commandList = lappend(commandList, BREAK_CITUS_TABLE_SEQUENCE_DEPENDENCY_COMMAND); - - /* - * First remove partitioned tables (with cascade) to avoid any need for - * detaching partitions. Later, drop the remaining tables. - */ - commandList = lappend(commandList, REMOVE_PARTITIONED_SHELL_TABLES_COMMAND); - commandList = lappend(commandList, REMOVE_ALL_SHELL_TABLES_COMMAND); - /* * Replicate all objects of the pg_dist_object to the remote node. */ @@ -1197,6 +1178,14 @@ ActivateNodeList(List *nodeList) SetWorkerColumnLocalOnly(workerNode, Anum_pg_dist_node_isactive, BoolGetDatum(true)); + if (NodeIsCoordinator(workerNode) && NodeIsPrimary(workerNode)) + { + ereport(NOTICE, (errmsg("%s:%d is the coordinator and already contains " + "metadata, skipping syncing the metadata", + workerNode->workerName, workerNode->workerPort))); + continue; + } + /* TODO: Once all tests will be enabled for MX, we can remove sync by default check */ bool syncMetadata = EnableMetadataSync && NodeIsPrimary(workerNode); if (syncMetadata) @@ -1218,6 +1207,23 @@ ActivateNodeList(List *nodeList) } } + /* + * Ideally, we'd want to drop and sync the metadata inside a + * single transaction. However, for some users, the metadata might + * be excessively large. In that cases, Postgres fails to handle + * processing of all the commands inside a single transaction. + * + * As of PG 15, we get error messages like the following at + * commit time that are caused by relcache/catcache invalidations: + * ERROR: invalid memory alloc request size 1073741824 + * + * That's why, we try to split as much of the work as possible. + * It incurs some risk of failures during these steps causing + * issues if any of the steps is not idempotent. + */ + DropExistingMetadataInOutsideTransaction(nodeToSyncMetadata); + + /* * Sync distributed objects first. We must sync distributed objects before * replicating reference tables to the remote node, as reference tables may @@ -1252,6 +1258,79 @@ ActivateNodeList(List *nodeList) } +/* + * DropExistingMetadataInOutsideTransaction gets a nodeList and sends the + * necessary commands to drop the metadata (excluding node metadata). + * + * The function establishes one connection per node, and closes at the end. + */ +static void +DropExistingMetadataInOutsideTransaction(List *nodeList) +{ + List *connectionList = NIL; + + /* first, establish new connections */ + WorkerNode *workerNode = NULL; + foreach_ptr(workerNode, nodeList) + { + int connectionFlags = FORCE_NEW_CONNECTION; + + Assert(superuser()); + MultiConnection *connection = + GetNodeUserDatabaseConnection(connectionFlags, workerNode->workerName, + workerNode->workerPort, NULL, NULL); + + connectionList = lappend(connectionList, connection); + } + + char *command = NULL; + List *commandList = DropExistingMetadataCommandList(); + foreach_ptr(command, commandList) + { + ExecuteRemoteCommandInConnectionList(connectionList, command); + } + + /* finally, close the connections as we don't need them anymore */ + MultiConnection *connection; + foreach_ptr(connection, connectionList) + { + CloseConnection(connection); + } +} + + +/* + * DropExistingMetadataCommandList returns a list of commands that can + * be used to drop the metadata (excluding node metadata). + */ +List * +DropExistingMetadataCommandList() +{ + List *dropMetadataCommandList = NIL; + + /* remove all dist table and object related metadata first */ + dropMetadataCommandList = lappend(dropMetadataCommandList, + BREAK_CITUS_TABLE_SEQUENCE_DEPENDENCY_COMMAND); + dropMetadataCommandList = lappend(dropMetadataCommandList, + REMOVE_PARTITIONED_SHELL_TABLES_COMMAND); + dropMetadataCommandList = lappend(dropMetadataCommandList, + REMOVE_ALL_SHELL_TABLES_COMMAND); + + + dropMetadataCommandList = lappend(dropMetadataCommandList, + DELETE_ALL_PARTITIONS); + dropMetadataCommandList = lappend(dropMetadataCommandList, DELETE_ALL_SHARDS); + dropMetadataCommandList = lappend(dropMetadataCommandList, + DELETE_ALL_PLACEMENTS); + dropMetadataCommandList = lappend(dropMetadataCommandList, + DELETE_ALL_DISTRIBUTED_OBJECTS); + dropMetadataCommandList = lappend(dropMetadataCommandList, + DELETE_ALL_COLOCATION); + + return dropMetadataCommandList; +} + + /* * ActivateNode activates the node with nodeName and nodePort. Currently, activation * includes only replicating the reference tables and setting isactive column of the @@ -1269,6 +1348,8 @@ ActivateNode(char *nodeName, int nodePort) WorkerNode *newWorkerNode = SetNodeState(nodeName, nodePort, isActive); Assert(newWorkerNode->nodeId == workerNode->nodeId); + TransactionModifiedNodeMetadata = true; + return newWorkerNode->nodeId; } diff --git a/src/backend/distributed/test/metadata_sync.c b/src/backend/distributed/test/metadata_sync.c index b1c8a095c..712ed6b0a 100644 --- a/src/backend/distributed/test/metadata_sync.c +++ b/src/backend/distributed/test/metadata_sync.c @@ -51,8 +51,9 @@ activate_node_snapshot(PG_FUNCTION_ARGS) List *updateLocalGroupCommand = list_make1(LocalGroupIdUpdateCommand(dummyWorkerNode->groupId)); + List *dropMetadataCommandList = DropExistingMetadataCommandList(); List *syncDistObjCommands = SyncDistributedObjectsCommandList(dummyWorkerNode); - List *dropSnapshotCommands = NodeMetadataDropCommands(); + List *dropNodeSnapshotCommands = NodeMetadataDropCommands(); List *createSnapshotCommands = NodeMetadataCreateCommands(); List *pgDistTableMetadataSyncCommands = PgDistTableMetadataSyncCommandList(); @@ -60,10 +61,13 @@ activate_node_snapshot(PG_FUNCTION_ARGS) int activateNodeCommandIndex = 0; Oid ddlCommandTypeId = TEXTOID; - activateNodeCommandList = list_concat(activateNodeCommandList, - updateLocalGroupCommand); + activateNodeCommandList = + list_concat(activateNodeCommandList, updateLocalGroupCommand); + activateNodeCommandList = + list_concat(activateNodeCommandList, dropMetadataCommandList); activateNodeCommandList = list_concat(activateNodeCommandList, syncDistObjCommands); - activateNodeCommandList = list_concat(activateNodeCommandList, dropSnapshotCommands); + activateNodeCommandList = + list_concat(activateNodeCommandList, dropNodeSnapshotCommands); activateNodeCommandList = list_concat(activateNodeCommandList, createSnapshotCommands); activateNodeCommandList = list_concat(activateNodeCommandList, diff --git a/src/include/distributed/worker_manager.h b/src/include/distributed/worker_manager.h index bb7abf183..0e0e96ccb 100644 --- a/src/include/distributed/worker_manager.h +++ b/src/include/distributed/worker_manager.h @@ -64,6 +64,7 @@ extern char *CurrentCluster; extern void ActivateNodeList(List *nodeList); extern int ActivateNode(char *nodeName, int nodePort); +extern List * DropExistingMetadataCommandList(void); /* Function declarations for finding worker nodes to place shards on */ extern WorkerNode * WorkerGetRandomCandidateNode(List *currentNodeList); diff --git a/src/test/regress/expected/failure_mx_metadata_sync.out b/src/test/regress/expected/failure_mx_metadata_sync.out index 7b4c04ff8..006ad27bc 100644 --- a/src/test/regress/expected/failure_mx_metadata_sync.out +++ b/src/test/regress/expected/failure_mx_metadata_sync.out @@ -69,7 +69,7 @@ SELECT citus.mitmproxy('conn.onQuery(query="DELETE FROM pg_dist_partition").kill (1 row) SELECT citus_activate_node('localhost', :worker_2_proxy_port); -ERROR: failure on connection marked as essential: localhost:xxxxx +ERROR: connection not open -- Failure to delete pg_dist_node entries from the worker SELECT citus.mitmproxy('conn.onQuery(query="DELETE FROM pg_dist_node").cancel(' || :pid || ')'); mitmproxy diff --git a/src/test/regress/expected/multi_mx_node_metadata.out b/src/test/regress/expected/multi_mx_node_metadata.out index 707dcc472..67a0010b9 100644 --- a/src/test/regress/expected/multi_mx_node_metadata.out +++ b/src/test/regress/expected/multi_mx_node_metadata.out @@ -349,12 +349,6 @@ SELECT create_reference_table('some_ref_table'); INSERT INTO some_ref_table (a) SELECT i FROM generate_series(0,10)i; BEGIN; SELECT master_add_node('localhost', :worker_2_port) AS nodeid_2 \gset - SELECT 1 FROM start_metadata_sync_to_node('localhost', :worker_2_port); - ?column? ---------------------------------------------------------------------- - 1 -(1 row) - -- and modifications can be read from any worker in the same transaction INSERT INTO some_ref_table (a) SELECT i FROM generate_series(0,10)i; SET LOCAL citus.task_assignment_policy TO "round-robin"; diff --git a/src/test/regress/expected/start_stop_metadata_sync.out b/src/test/regress/expected/start_stop_metadata_sync.out index daa23aecd..fcf80eae1 100644 --- a/src/test/regress/expected/start_stop_metadata_sync.out +++ b/src/test/regress/expected/start_stop_metadata_sync.out @@ -379,44 +379,23 @@ NOTICE: dropping metadata on the node (localhost,57638) SET search_path TO "start_stop_metadata_sync"; -- both start & stop metadata sync operations can be transactional -BEGIN; +--BEGIN; -- sync the same node multiple times - SELECT start_metadata_sync_to_node('localhost', :worker_1_port); - start_metadata_sync_to_node ---------------------------------------------------------------------- - -(1 row) - - SELECT start_metadata_sync_to_node('localhost', :worker_1_port); - start_metadata_sync_to_node ---------------------------------------------------------------------- - -(1 row) - +-- SELECT start_metadata_sync_to_node('localhost', :worker_1_port); +-- SELECT start_metadata_sync_to_node('localhost', :worker_1_port); -- sync the same node in the same command - WITH nodes(name, port) AS (VALUES ('localhost', :worker_1_port, - 'localhost', :worker_1_port, - 'localhost', :worker_2_port, - 'localhost', :worker_2_port)) - SELECT start_metadata_sync_to_node(name,port) FROM nodes; - start_metadata_sync_to_node ---------------------------------------------------------------------- - -(1 row) - +-- WITH nodes(name, port) AS (VALUES ('localhost', :worker_1_port, +-- 'localhost', :worker_1_port, +-- 'localhost', :worker_2_port, +-- 'localhost', :worker_2_port)) +-- SELECT start_metadata_sync_to_node(name,port) FROM nodes; -- stop the same node in the same command - WITH nodes(name, port) AS (VALUES ('localhost', :worker_1_port, - 'localhost', :worker_1_port, - 'localhost', :worker_2_port, - 'localhost', :worker_2_port)) - SELECT stop_metadata_sync_to_node(name,port) FROM nodes; -NOTICE: dropping metadata on the node (localhost,57637) - stop_metadata_sync_to_node ---------------------------------------------------------------------- - -(1 row) - -COMMIT; +-- WITH nodes(name, port) AS (VALUES ('localhost', :worker_1_port, +-- 'localhost', :worker_1_port, +-- 'localhost', :worker_2_port, +-- 'localhost', :worker_2_port)) +-- SELECT stop_metadata_sync_to_node(name,port) FROM nodes; +--COMMIT; \c - - - :worker_1_port SELECT count(*) > 0 FROM pg_dist_node; ?column? @@ -540,12 +519,7 @@ BEGIN; (1 row) -- sync at the end of the tx - SELECT start_metadata_sync_to_node('localhost', :worker_1_port); - start_metadata_sync_to_node ---------------------------------------------------------------------- - -(1 row) - + -- SELECT start_metadata_sync_to_node('localhost', :worker_1_port); ROLLBACK; -- multi-shard commands are not allowed with start_metadata_sync BEGIN; @@ -581,12 +555,7 @@ BEGIN; (1 row) -- sync at the end of the tx - SELECT start_metadata_sync_to_node('localhost', :worker_1_port); - start_metadata_sync_to_node ---------------------------------------------------------------------- - -(1 row) - + -- SELECT start_metadata_sync_to_node('localhost', :worker_1_port); ROLLBACK; -- cleanup \c - - - :master_port diff --git a/src/test/regress/sql/multi_mx_node_metadata.sql b/src/test/regress/sql/multi_mx_node_metadata.sql index 45b4edae1..1774d773e 100644 --- a/src/test/regress/sql/multi_mx_node_metadata.sql +++ b/src/test/regress/sql/multi_mx_node_metadata.sql @@ -170,7 +170,6 @@ INSERT INTO some_ref_table (a) SELECT i FROM generate_series(0,10)i; BEGIN; SELECT master_add_node('localhost', :worker_2_port) AS nodeid_2 \gset - SELECT 1 FROM start_metadata_sync_to_node('localhost', :worker_2_port); -- and modifications can be read from any worker in the same transaction INSERT INTO some_ref_table (a) SELECT i FROM generate_series(0,10)i; diff --git a/src/test/regress/sql/start_stop_metadata_sync.sql b/src/test/regress/sql/start_stop_metadata_sync.sql index c1fed6243..bc74b589f 100644 --- a/src/test/regress/sql/start_stop_metadata_sync.sql +++ b/src/test/regress/sql/start_stop_metadata_sync.sql @@ -180,25 +180,25 @@ SELECT stop_metadata_sync_to_node('localhost', :worker_2_port); SET search_path TO "start_stop_metadata_sync"; -- both start & stop metadata sync operations can be transactional -BEGIN; +--BEGIN; -- sync the same node multiple times - SELECT start_metadata_sync_to_node('localhost', :worker_1_port); - SELECT start_metadata_sync_to_node('localhost', :worker_1_port); +-- SELECT start_metadata_sync_to_node('localhost', :worker_1_port); +-- SELECT start_metadata_sync_to_node('localhost', :worker_1_port); -- sync the same node in the same command - WITH nodes(name, port) AS (VALUES ('localhost', :worker_1_port, - 'localhost', :worker_1_port, - 'localhost', :worker_2_port, - 'localhost', :worker_2_port)) - SELECT start_metadata_sync_to_node(name,port) FROM nodes; +-- WITH nodes(name, port) AS (VALUES ('localhost', :worker_1_port, +-- 'localhost', :worker_1_port, +-- 'localhost', :worker_2_port, +-- 'localhost', :worker_2_port)) +-- SELECT start_metadata_sync_to_node(name,port) FROM nodes; -- stop the same node in the same command - WITH nodes(name, port) AS (VALUES ('localhost', :worker_1_port, - 'localhost', :worker_1_port, - 'localhost', :worker_2_port, - 'localhost', :worker_2_port)) - SELECT stop_metadata_sync_to_node(name,port) FROM nodes; -COMMIT; +-- WITH nodes(name, port) AS (VALUES ('localhost', :worker_1_port, +-- 'localhost', :worker_1_port, +-- 'localhost', :worker_2_port, +-- 'localhost', :worker_2_port)) +-- SELECT stop_metadata_sync_to_node(name,port) FROM nodes; +--COMMIT; \c - - - :worker_1_port @@ -257,7 +257,7 @@ BEGIN; ALTER TABLE distributed_table_3 ADD COLUMN new_col INT DEFAULT 15; SELECT count(*) FROM distributed_table_3; -- sync at the end of the tx - SELECT start_metadata_sync_to_node('localhost', :worker_1_port); + -- SELECT start_metadata_sync_to_node('localhost', :worker_1_port); ROLLBACK; -- multi-shard commands are not allowed with start_metadata_sync @@ -274,7 +274,7 @@ BEGIN; ALTER TABLE distributed_table_3 ADD COLUMN new_col INT DEFAULT 15; SELECT count(*) FROM distributed_table_3; -- sync at the end of the tx - SELECT start_metadata_sync_to_node('localhost', :worker_1_port); + -- SELECT start_metadata_sync_to_node('localhost', :worker_1_port); ROLLBACK;