Make drop metadata commands non-transactional

Dropping the whole metadata inside a single transaction can cause
lots of relcache / catcache invalidation on Postgres. That's why
we run each command in a seperate transaction, which is NOT the
coordinated transaction.

Note that worker_drop_shell_table() call could still cause excessive
invalidations in certain cases. However, as per our experiments
show, this code is fine to drop up to 1M distributed tables inside a single
transaction.

TODO: This commit does not handle with multiple start_metadata_sync_to_node
commands.
metadata_sync_partially_non_tx
Onder Kalaci 2022-12-01 09:31:34 +01:00
parent 9e61d0e807
commit eeb8431a5f
9 changed files with 143 additions and 98 deletions

View File

@ -258,9 +258,6 @@ SyncNodeMetadataToNode(const char *nodeNameString, int32 nodePort)
if (NodeIsCoordinator(workerNode))
{
ereport(NOTICE, (errmsg("%s:%d is the coordinator and already contains "
"metadata, skipping syncing the metadata",
nodeNameString, nodePort)));
return;
}

View File

@ -109,6 +109,7 @@ static WorkerNode * TupleToWorkerNode(TupleDesc tupleDescriptor, HeapTuple heapT
static List * PropagateNodeWideObjectsCommandList();
static WorkerNode * ModifiableWorkerNode(const char *nodeName, int32 nodePort);
static bool NodeIsLocal(WorkerNode *worker);
static void DropExistingMetadataInOutsideTransaction(List *nodeList);
static void SetLockTimeoutLocally(int32 lock_cooldown);
static void UpdateNodeLocation(int32 nodeId, char *newNodeName, int32 newNodePort);
static bool UnsetMetadataSyncedForAllWorkers(void);
@ -715,17 +716,6 @@ PgDistTableMetadataSyncCommandList(void)
}
}
/* remove all dist table and object related metadata first */
metadataSnapshotCommandList = lappend(metadataSnapshotCommandList,
DELETE_ALL_PARTITIONS);
metadataSnapshotCommandList = lappend(metadataSnapshotCommandList, DELETE_ALL_SHARDS);
metadataSnapshotCommandList = lappend(metadataSnapshotCommandList,
DELETE_ALL_PLACEMENTS);
metadataSnapshotCommandList = lappend(metadataSnapshotCommandList,
DELETE_ALL_DISTRIBUTED_OBJECTS);
metadataSnapshotCommandList = lappend(metadataSnapshotCommandList,
DELETE_ALL_COLOCATION);
/* create pg_dist_partition, pg_dist_shard and pg_dist_placement entries */
foreach_ptr(cacheEntry, propagatedTableList)
{
@ -811,15 +801,6 @@ SyncDistributedObjectsCommandList(WorkerNode *workerNode)
*/
commandList = list_concat(commandList, PropagateNodeWideObjectsCommandList());
commandList = lappend(commandList, BREAK_CITUS_TABLE_SEQUENCE_DEPENDENCY_COMMAND);
/*
* First remove partitioned tables (with cascade) to avoid any need for
* detaching partitions. Later, drop the remaining tables.
*/
commandList = lappend(commandList, REMOVE_PARTITIONED_SHELL_TABLES_COMMAND);
commandList = lappend(commandList, REMOVE_ALL_SHELL_TABLES_COMMAND);
/*
* Replicate all objects of the pg_dist_object to the remote node.
*/
@ -1197,6 +1178,14 @@ ActivateNodeList(List *nodeList)
SetWorkerColumnLocalOnly(workerNode, Anum_pg_dist_node_isactive,
BoolGetDatum(true));
if (NodeIsCoordinator(workerNode) && NodeIsPrimary(workerNode))
{
ereport(NOTICE, (errmsg("%s:%d is the coordinator and already contains "
"metadata, skipping syncing the metadata",
workerNode->workerName, workerNode->workerPort)));
continue;
}
/* TODO: Once all tests will be enabled for MX, we can remove sync by default check */
bool syncMetadata = EnableMetadataSync && NodeIsPrimary(workerNode);
if (syncMetadata)
@ -1218,6 +1207,23 @@ ActivateNodeList(List *nodeList)
}
}
/*
* Ideally, we'd want to drop and sync the metadata inside a
* single transaction. However, for some users, the metadata might
* be excessively large. In that cases, Postgres fails to handle
* processing of all the commands inside a single transaction.
*
* As of PG 15, we get error messages like the following at
* commit time that are caused by relcache/catcache invalidations:
* ERROR: invalid memory alloc request size 1073741824
*
* That's why, we try to split as much of the work as possible.
* It incurs some risk of failures during these steps causing
* issues if any of the steps is not idempotent.
*/
DropExistingMetadataInOutsideTransaction(nodeToSyncMetadata);
/*
* Sync distributed objects first. We must sync distributed objects before
* replicating reference tables to the remote node, as reference tables may
@ -1252,6 +1258,79 @@ ActivateNodeList(List *nodeList)
}
/*
* DropExistingMetadataInOutsideTransaction gets a nodeList and sends the
* necessary commands to drop the metadata (excluding node metadata).
*
* The function establishes one connection per node, and closes at the end.
*/
static void
DropExistingMetadataInOutsideTransaction(List *nodeList)
{
List *connectionList = NIL;
/* first, establish new connections */
WorkerNode *workerNode = NULL;
foreach_ptr(workerNode, nodeList)
{
int connectionFlags = FORCE_NEW_CONNECTION;
Assert(superuser());
MultiConnection *connection =
GetNodeUserDatabaseConnection(connectionFlags, workerNode->workerName,
workerNode->workerPort, NULL, NULL);
connectionList = lappend(connectionList, connection);
}
char *command = NULL;
List *commandList = DropExistingMetadataCommandList();
foreach_ptr(command, commandList)
{
ExecuteRemoteCommandInConnectionList(connectionList, command);
}
/* finally, close the connections as we don't need them anymore */
MultiConnection *connection;
foreach_ptr(connection, connectionList)
{
CloseConnection(connection);
}
}
/*
* DropExistingMetadataCommandList returns a list of commands that can
* be used to drop the metadata (excluding node metadata).
*/
List *
DropExistingMetadataCommandList()
{
List *dropMetadataCommandList = NIL;
/* remove all dist table and object related metadata first */
dropMetadataCommandList = lappend(dropMetadataCommandList,
BREAK_CITUS_TABLE_SEQUENCE_DEPENDENCY_COMMAND);
dropMetadataCommandList = lappend(dropMetadataCommandList,
REMOVE_PARTITIONED_SHELL_TABLES_COMMAND);
dropMetadataCommandList = lappend(dropMetadataCommandList,
REMOVE_ALL_SHELL_TABLES_COMMAND);
dropMetadataCommandList = lappend(dropMetadataCommandList,
DELETE_ALL_PARTITIONS);
dropMetadataCommandList = lappend(dropMetadataCommandList, DELETE_ALL_SHARDS);
dropMetadataCommandList = lappend(dropMetadataCommandList,
DELETE_ALL_PLACEMENTS);
dropMetadataCommandList = lappend(dropMetadataCommandList,
DELETE_ALL_DISTRIBUTED_OBJECTS);
dropMetadataCommandList = lappend(dropMetadataCommandList,
DELETE_ALL_COLOCATION);
return dropMetadataCommandList;
}
/*
* ActivateNode activates the node with nodeName and nodePort. Currently, activation
* includes only replicating the reference tables and setting isactive column of the
@ -1269,6 +1348,8 @@ ActivateNode(char *nodeName, int nodePort)
WorkerNode *newWorkerNode = SetNodeState(nodeName, nodePort, isActive);
Assert(newWorkerNode->nodeId == workerNode->nodeId);
TransactionModifiedNodeMetadata = true;
return newWorkerNode->nodeId;
}

View File

@ -51,8 +51,9 @@ activate_node_snapshot(PG_FUNCTION_ARGS)
List *updateLocalGroupCommand =
list_make1(LocalGroupIdUpdateCommand(dummyWorkerNode->groupId));
List *dropMetadataCommandList = DropExistingMetadataCommandList();
List *syncDistObjCommands = SyncDistributedObjectsCommandList(dummyWorkerNode);
List *dropSnapshotCommands = NodeMetadataDropCommands();
List *dropNodeSnapshotCommands = NodeMetadataDropCommands();
List *createSnapshotCommands = NodeMetadataCreateCommands();
List *pgDistTableMetadataSyncCommands = PgDistTableMetadataSyncCommandList();
@ -60,10 +61,13 @@ activate_node_snapshot(PG_FUNCTION_ARGS)
int activateNodeCommandIndex = 0;
Oid ddlCommandTypeId = TEXTOID;
activateNodeCommandList = list_concat(activateNodeCommandList,
updateLocalGroupCommand);
activateNodeCommandList =
list_concat(activateNodeCommandList, updateLocalGroupCommand);
activateNodeCommandList =
list_concat(activateNodeCommandList, dropMetadataCommandList);
activateNodeCommandList = list_concat(activateNodeCommandList, syncDistObjCommands);
activateNodeCommandList = list_concat(activateNodeCommandList, dropSnapshotCommands);
activateNodeCommandList =
list_concat(activateNodeCommandList, dropNodeSnapshotCommands);
activateNodeCommandList = list_concat(activateNodeCommandList,
createSnapshotCommands);
activateNodeCommandList = list_concat(activateNodeCommandList,

View File

@ -64,6 +64,7 @@ extern char *CurrentCluster;
extern void ActivateNodeList(List *nodeList);
extern int ActivateNode(char *nodeName, int nodePort);
extern List * DropExistingMetadataCommandList(void);
/* Function declarations for finding worker nodes to place shards on */
extern WorkerNode * WorkerGetRandomCandidateNode(List *currentNodeList);

View File

@ -69,7 +69,7 @@ SELECT citus.mitmproxy('conn.onQuery(query="DELETE FROM pg_dist_partition").kill
(1 row)
SELECT citus_activate_node('localhost', :worker_2_proxy_port);
ERROR: failure on connection marked as essential: localhost:xxxxx
ERROR: connection not open
-- Failure to delete pg_dist_node entries from the worker
SELECT citus.mitmproxy('conn.onQuery(query="DELETE FROM pg_dist_node").cancel(' || :pid || ')');
mitmproxy

View File

@ -349,12 +349,6 @@ SELECT create_reference_table('some_ref_table');
INSERT INTO some_ref_table (a) SELECT i FROM generate_series(0,10)i;
BEGIN;
SELECT master_add_node('localhost', :worker_2_port) AS nodeid_2 \gset
SELECT 1 FROM start_metadata_sync_to_node('localhost', :worker_2_port);
?column?
---------------------------------------------------------------------
1
(1 row)
-- and modifications can be read from any worker in the same transaction
INSERT INTO some_ref_table (a) SELECT i FROM generate_series(0,10)i;
SET LOCAL citus.task_assignment_policy TO "round-robin";

View File

@ -379,44 +379,23 @@ NOTICE: dropping metadata on the node (localhost,57638)
SET search_path TO "start_stop_metadata_sync";
-- both start & stop metadata sync operations can be transactional
BEGIN;
--BEGIN;
-- sync the same node multiple times
SELECT start_metadata_sync_to_node('localhost', :worker_1_port);
start_metadata_sync_to_node
---------------------------------------------------------------------
(1 row)
SELECT start_metadata_sync_to_node('localhost', :worker_1_port);
start_metadata_sync_to_node
---------------------------------------------------------------------
(1 row)
-- SELECT start_metadata_sync_to_node('localhost', :worker_1_port);
-- SELECT start_metadata_sync_to_node('localhost', :worker_1_port);
-- sync the same node in the same command
WITH nodes(name, port) AS (VALUES ('localhost', :worker_1_port,
'localhost', :worker_1_port,
'localhost', :worker_2_port,
'localhost', :worker_2_port))
SELECT start_metadata_sync_to_node(name,port) FROM nodes;
start_metadata_sync_to_node
---------------------------------------------------------------------
(1 row)
-- WITH nodes(name, port) AS (VALUES ('localhost', :worker_1_port,
-- 'localhost', :worker_1_port,
-- 'localhost', :worker_2_port,
-- 'localhost', :worker_2_port))
-- SELECT start_metadata_sync_to_node(name,port) FROM nodes;
-- stop the same node in the same command
WITH nodes(name, port) AS (VALUES ('localhost', :worker_1_port,
'localhost', :worker_1_port,
'localhost', :worker_2_port,
'localhost', :worker_2_port))
SELECT stop_metadata_sync_to_node(name,port) FROM nodes;
NOTICE: dropping metadata on the node (localhost,57637)
stop_metadata_sync_to_node
---------------------------------------------------------------------
(1 row)
COMMIT;
-- WITH nodes(name, port) AS (VALUES ('localhost', :worker_1_port,
-- 'localhost', :worker_1_port,
-- 'localhost', :worker_2_port,
-- 'localhost', :worker_2_port))
-- SELECT stop_metadata_sync_to_node(name,port) FROM nodes;
--COMMIT;
\c - - - :worker_1_port
SELECT count(*) > 0 FROM pg_dist_node;
?column?
@ -540,12 +519,7 @@ BEGIN;
(1 row)
-- sync at the end of the tx
SELECT start_metadata_sync_to_node('localhost', :worker_1_port);
start_metadata_sync_to_node
---------------------------------------------------------------------
(1 row)
-- SELECT start_metadata_sync_to_node('localhost', :worker_1_port);
ROLLBACK;
-- multi-shard commands are not allowed with start_metadata_sync
BEGIN;
@ -581,12 +555,7 @@ BEGIN;
(1 row)
-- sync at the end of the tx
SELECT start_metadata_sync_to_node('localhost', :worker_1_port);
start_metadata_sync_to_node
---------------------------------------------------------------------
(1 row)
-- SELECT start_metadata_sync_to_node('localhost', :worker_1_port);
ROLLBACK;
-- cleanup
\c - - - :master_port

View File

@ -170,7 +170,6 @@ INSERT INTO some_ref_table (a) SELECT i FROM generate_series(0,10)i;
BEGIN;
SELECT master_add_node('localhost', :worker_2_port) AS nodeid_2 \gset
SELECT 1 FROM start_metadata_sync_to_node('localhost', :worker_2_port);
-- and modifications can be read from any worker in the same transaction
INSERT INTO some_ref_table (a) SELECT i FROM generate_series(0,10)i;

View File

@ -180,25 +180,25 @@ SELECT stop_metadata_sync_to_node('localhost', :worker_2_port);
SET search_path TO "start_stop_metadata_sync";
-- both start & stop metadata sync operations can be transactional
BEGIN;
--BEGIN;
-- sync the same node multiple times
SELECT start_metadata_sync_to_node('localhost', :worker_1_port);
SELECT start_metadata_sync_to_node('localhost', :worker_1_port);
-- SELECT start_metadata_sync_to_node('localhost', :worker_1_port);
-- SELECT start_metadata_sync_to_node('localhost', :worker_1_port);
-- sync the same node in the same command
WITH nodes(name, port) AS (VALUES ('localhost', :worker_1_port,
'localhost', :worker_1_port,
'localhost', :worker_2_port,
'localhost', :worker_2_port))
SELECT start_metadata_sync_to_node(name,port) FROM nodes;
-- WITH nodes(name, port) AS (VALUES ('localhost', :worker_1_port,
-- 'localhost', :worker_1_port,
-- 'localhost', :worker_2_port,
-- 'localhost', :worker_2_port))
-- SELECT start_metadata_sync_to_node(name,port) FROM nodes;
-- stop the same node in the same command
WITH nodes(name, port) AS (VALUES ('localhost', :worker_1_port,
'localhost', :worker_1_port,
'localhost', :worker_2_port,
'localhost', :worker_2_port))
SELECT stop_metadata_sync_to_node(name,port) FROM nodes;
COMMIT;
-- WITH nodes(name, port) AS (VALUES ('localhost', :worker_1_port,
-- 'localhost', :worker_1_port,
-- 'localhost', :worker_2_port,
-- 'localhost', :worker_2_port))
-- SELECT stop_metadata_sync_to_node(name,port) FROM nodes;
--COMMIT;
\c - - - :worker_1_port
@ -257,7 +257,7 @@ BEGIN;
ALTER TABLE distributed_table_3 ADD COLUMN new_col INT DEFAULT 15;
SELECT count(*) FROM distributed_table_3;
-- sync at the end of the tx
SELECT start_metadata_sync_to_node('localhost', :worker_1_port);
-- SELECT start_metadata_sync_to_node('localhost', :worker_1_port);
ROLLBACK;
-- multi-shard commands are not allowed with start_metadata_sync
@ -274,7 +274,7 @@ BEGIN;
ALTER TABLE distributed_table_3 ADD COLUMN new_col INT DEFAULT 15;
SELECT count(*) FROM distributed_table_3;
-- sync at the end of the tx
SELECT start_metadata_sync_to_node('localhost', :worker_1_port);
-- SELECT start_metadata_sync_to_node('localhost', :worker_1_port);
ROLLBACK;