From d2f2acc4b2c4e53e2490e7cec1ac64fcd28d98f6 Mon Sep 17 00:00:00 2001 From: Hadi Moshayedi Date: Tue, 17 Sep 2019 16:36:51 -0700 Subject: [PATCH] Make master_update_node citus-ha friendly. --- src/backend/distributed/utils/node_metadata.c | 260 +++++------------- .../isolation_dump_global_wait_edges.out | 18 +- .../isolation_dump_global_wait_edges_0.out | 18 +- .../expected/isolation_update_node.out | 13 +- .../expected/multi_mx_master_update_node.out | 67 ++++- .../sql/multi_mx_master_update_node.sql | 31 ++- 6 files changed, 183 insertions(+), 224 deletions(-) diff --git a/src/backend/distributed/utils/node_metadata.c b/src/backend/distributed/utils/node_metadata.c index fb8b04583..7c202d550 100644 --- a/src/backend/distributed/utils/node_metadata.c +++ b/src/backend/distributed/utils/node_metadata.c @@ -67,9 +67,6 @@ typedef struct NodeMetadata } NodeMetadata; /* local function forward declarations */ -static List * WorkerListDelete(List *workerList, uint32 nodeId); -static List * SyncedMetadataNodeList(void); -static void SyncDistNodeEntryToNodes(WorkerNode *workerNode, List *metadataWorkers); static int ActivateNode(char *nodeName, int nodePort); static void RemoveNodeFromCluster(char *nodeName, int32 nodePort); static int AddNodeMetadata(char *nodeName, int32 nodePort, NodeMetadata @@ -84,9 +81,7 @@ static void DeleteNodeRow(char *nodename, int32 nodeport); static List * ParseWorkerNodeFileAndRename(void); static WorkerNode * TupleToWorkerNode(TupleDesc tupleDescriptor, HeapTuple heapTuple); static void UpdateNodeLocation(int32 nodeId, char *newNodeName, int32 newNodePort); -static bool * SendOptionalCommandListToWorkers(List *workerNodeList, - List *commandList, - const char *nodeUser); +static bool UnsetMetadataSyncedForAll(void); /* declarations for dynamic loading */ PG_FUNCTION_INFO_V1(master_add_node); @@ -515,7 +510,6 @@ master_update_node(PG_FUNCTION_ARGS) WorkerNode *workerNode = NULL; WorkerNode *workerNodeWithSameAddress = NULL; List *placementList = NIL; - List *metadataWorkersToSync = NIL; BackgroundWorkerHandle *handle = NULL; CheckCitusVersion(ERROR); @@ -593,25 +587,21 @@ master_update_node(PG_FUNCTION_ARGS) /* * Propagate the updated pg_dist_node entry to all metadata workers. - * The usual case is that the new node is in stand-by mode, so we don't - * sync the changes to the new node, and instead schedule it for being - * synced by the maintenance daemon. + * citus-ha uses master_update_node() in a prepared transaction, and + * we don't support coordinated prepared transactions, so we cannot + * propagate the changes to the worker nodes here. Instead we mark + * all metadata nodes as not-synced and ask maintenanced to do the + * propagation. * * It is possible that maintenance daemon does the first resync too * early, but that's fine, since this will start a retry loop with * 5 second intervals until sync is complete. */ - metadataWorkersToSync = SyncedMetadataNodeList(); - if (workerNode->hasMetadata) + if (UnsetMetadataSyncedForAll()) { - metadataWorkersToSync = WorkerListDelete(metadataWorkersToSync, nodeId); - MarkNodeMetadataSynced(workerNode->workerName, - workerNode->workerPort, false); TriggerMetadataSync(MyDatabaseId); } - SyncDistNodeEntryToNodes(workerNode, metadataWorkersToSync); - if (handle != NULL) { /* @@ -625,176 +615,6 @@ master_update_node(PG_FUNCTION_ARGS) } -/* MetadataNodeList returns list of all synced metadata workers. */ -static List * -SyncedMetadataNodeList(void) -{ - List *metadataNodeList = NIL; - List *activePrimaries = ActivePrimaryNodeList(AccessShareLock); - ListCell *workerNodeCell = NULL; - - foreach(workerNodeCell, activePrimaries) - { - WorkerNode *workerNode = lfirst(workerNodeCell); - if (workerNode->hasMetadata && workerNode->metadataSynced) - { - metadataNodeList = lappend(metadataNodeList, workerNode); - } - } - - return metadataNodeList; -} - - -/* - * WorkerListDelete removes the worker node with the given id - * from the list of given workers. - */ -static List * -WorkerListDelete(List *workerList, uint32 nodeId) -{ - List *filteredWorkerList = NIL; - ListCell *workerCell = NULL; - - foreach(workerCell, workerList) - { - WorkerNode *workerNode = lfirst(workerCell); - if (workerNode->nodeId != nodeId) - { - filteredWorkerList = lappend(filteredWorkerList, workerNode); - } - } - - return filteredWorkerList; -} - - -/* - * SyncDistNodeEntryToNodes synchronizes the corresponding entry for - * the given workerNode in pg_dist_node metadata table of given - * metadataWorkers. If syncing to a node fails, pg_distnode.metadatasynced - * is set to false. - */ -static void -SyncDistNodeEntryToNodes(WorkerNode *nodeToSync, List *metadataWorkers) -{ - ListCell *workerCell = NULL; - char *extensionOwner = CitusExtensionOwnerName(); - char *nodeDeleteCommand = NodeDeleteCommand(nodeToSync->nodeId); - char *nodeInsertCommand = NodeListInsertCommand(list_make1(nodeToSync)); - List *commandList = list_make2(nodeDeleteCommand, nodeInsertCommand); - bool *workerFailed = SendOptionalCommandListToWorkers(metadataWorkers, commandList, - extensionOwner); - - int workerIndex = 0; - foreach(workerCell, metadataWorkers) - { - WorkerNode *workerNode = lfirst(workerCell); - - if (workerFailed[workerIndex]) - { - MarkNodeMetadataSynced(workerNode->workerName, workerNode->workerPort, false); - } - - workerIndex++; - } -} - - -/* - * SendOptionalCommandListToWorkers sends the given command list to the given - * worker list, and returns a bool[] indicating whether execution of the command - * list failed at a worker or not. - * - * If execution fails at a worker because of connection error, this function just - * emits a warning for that node. If execution fails because of a result error, - * the node is rolled-back to the status before this function so further queries - * can be sent to the node. - */ -static bool * -SendOptionalCommandListToWorkers(List *workerNodeList, List *commandList, - const char *nodeUser) -{ - List *connectionList = NIL; - ListCell *commandCell = NULL; - ListCell *connectionCell = NULL; - bool *workerFailed = palloc0(sizeof(workerNodeList)); - char *beginSavepointCommand = "SAVEPOINT sp_node_metadata"; - char *rollbackSavepointCommand = "ROLLBACK TO SAVEPOINT sp_node_metadata"; - char *releaseSavepointCommand = "RELEASE SAVEPOINT sp_node_metadata"; - - BeginOrContinueCoordinatedTransaction(); - - /* open connections in parallel */ - connectionList = StartWorkerListConnections(workerNodeList, 0, nodeUser, NULL); - FinishConnectionListEstablishment(connectionList); - - RemoteTransactionsBeginIfNecessary(connectionList); - - commandList = lcons(beginSavepointCommand, commandList); - commandList = lappend(commandList, releaseSavepointCommand); - - foreach(commandCell, commandList) - { - char *command = lfirst(commandCell); - int workerIndex = 0; - foreach(connectionCell, connectionList) - { - MultiConnection *connection = lfirst(connectionCell); - - if (!workerFailed[workerIndex]) - { - if (SendRemoteCommand(connection, command) == 0) - { - ReportConnectionError(connection, WARNING); - workerFailed[workerIndex] = true; - } - } - - workerIndex++; - } - - workerIndex = 0; - foreach(connectionCell, connectionList) - { - MultiConnection *connection = lfirst(connectionCell); - bool raiseInterrupts = true; - PGresult *commandResult = NULL; - bool responseOK = false; - - if (workerFailed[workerIndex]) - { - workerIndex++; - continue; - } - - commandResult = GetRemoteCommandResult(connection, raiseInterrupts); - responseOK = IsResponseOK(commandResult); - - if (!responseOK) - { - ReportResultError(connection, commandResult, WARNING); - workerFailed[workerIndex] = true; - - PQclear(commandResult); - ForgetResults(connection); - - ExecuteOptionalRemoteCommand(connection, rollbackSavepointCommand, NULL); - } - else - { - PQclear(commandResult); - ForgetResults(connection); - } - - workerIndex++; - } - } - - return workerFailed; -} - - static void UpdateNodeLocation(int32 nodeId, char *newNodeName, int32 newNodePort) { @@ -1783,3 +1603,69 @@ DatumToString(Datum datum, Oid dataType) return outputString; } + + +/* + * UnsetMetadataSyncedForAll sets the metadatasynced column of all metadata + * nodes to false. It returns true if it updated at least a node. + */ +static bool +UnsetMetadataSyncedForAll(void) +{ + bool updatedAtLeastOne = false; + Relation relation = NULL; + SysScanDesc scanDescriptor = NULL; + ScanKeyData scanKey[2]; + int scanKeyCount = 2; + bool indexOK = false; + HeapTuple heapTuple = NULL; + TupleDesc tupleDescriptor = NULL; + + relation = heap_open(DistNodeRelationId(), ExclusiveLock); + tupleDescriptor = RelationGetDescr(relation); + ScanKeyInit(&scanKey[0], Anum_pg_dist_node_hasmetadata, + BTEqualStrategyNumber, F_BOOLEQ, BoolGetDatum(true)); + ScanKeyInit(&scanKey[1], Anum_pg_dist_node_metadatasynced, + BTEqualStrategyNumber, F_BOOLEQ, BoolGetDatum(true)); + + scanDescriptor = systable_beginscan(relation, + InvalidOid, indexOK, + NULL, scanKeyCount, scanKey); + + heapTuple = systable_getnext(scanDescriptor); + if (HeapTupleIsValid(heapTuple)) + { + updatedAtLeastOne = true; + } + + while (HeapTupleIsValid(heapTuple)) + { + HeapTuple newHeapTuple = NULL; + Datum values[Natts_pg_dist_node]; + bool isnull[Natts_pg_dist_node]; + bool replace[Natts_pg_dist_node]; + + memset(replace, false, sizeof(replace)); + memset(isnull, false, sizeof(isnull)); + memset(values, 0, sizeof(values)); + + values[Anum_pg_dist_node_metadatasynced - 1] = BoolGetDatum(false); + replace[Anum_pg_dist_node_metadatasynced - 1] = true; + + newHeapTuple = heap_modify_tuple(heapTuple, tupleDescriptor, values, isnull, + replace); + + CatalogTupleUpdate(relation, &newHeapTuple->t_self, newHeapTuple); + + CommandCounterIncrement(); + + heap_freetuple(newHeapTuple); + + heapTuple = systable_getnext(scanDescriptor); + } + + systable_endscan(scanDescriptor); + heap_close(relation, NoLock); + + return updatedAtLeastOne; +} diff --git a/src/test/regress/expected/isolation_dump_global_wait_edges.out b/src/test/regress/expected/isolation_dump_global_wait_edges.out index 8cd8ffa15..84333599b 100644 --- a/src/test/regress/expected/isolation_dump_global_wait_edges.out +++ b/src/test/regress/expected/isolation_dump_global_wait_edges.out @@ -29,11 +29,11 @@ step detector-dump-wait-edges: waiting_transaction_numblocking_transaction_numblocking_transaction_waiting -274 273 f +266 265 f transactionnumberwaitingtransactionnumbers -273 -274 273 +265 +266 265 step s1-abort: ABORT; @@ -77,14 +77,14 @@ step detector-dump-wait-edges: waiting_transaction_numblocking_transaction_numblocking_transaction_waiting -278 277 f -279 277 f -279 278 t +270 269 f +271 269 f +271 270 t transactionnumberwaitingtransactionnumbers -277 -278 277 -279 277,278 +269 +270 269 +271 269,270 step s1-abort: ABORT; diff --git a/src/test/regress/expected/isolation_dump_global_wait_edges_0.out b/src/test/regress/expected/isolation_dump_global_wait_edges_0.out index 8dfe80286..f7a44bcb2 100644 --- a/src/test/regress/expected/isolation_dump_global_wait_edges_0.out +++ b/src/test/regress/expected/isolation_dump_global_wait_edges_0.out @@ -29,11 +29,11 @@ step detector-dump-wait-edges: waiting_transaction_numblocking_transaction_numblocking_transaction_waiting -275 274 f +267 266 f transactionnumberwaitingtransactionnumbers -274 -275 274 +266 +267 266 step s1-abort: ABORT; @@ -77,14 +77,14 @@ step detector-dump-wait-edges: waiting_transaction_numblocking_transaction_numblocking_transaction_waiting -279 278 f -280 278 f -280 279 t +271 270 f +272 270 f +272 271 t transactionnumberwaitingtransactionnumbers -278 -279 278 -280 278,279 +270 +271 270 +272 270,271 step s1-abort: ABORT; diff --git a/src/test/regress/expected/isolation_update_node.out b/src/test/regress/expected/isolation_update_node.out index eb08c1c8f..4cb171384 100644 --- a/src/test/regress/expected/isolation_update_node.out +++ b/src/test/regress/expected/isolation_update_node.out @@ -22,13 +22,14 @@ step s2-update-node-2: (select nodeid from pg_dist_node where nodeport = 57638), 'localhost', 58638); - -?column? - -1 + step s1-commit: COMMIT; +step s2-update-node-2: <... completed> +?column? + +1 step s1-show-nodes: SELECT nodeid, nodename, nodeport, isactive FROM pg_dist_node @@ -71,7 +72,9 @@ step s1-commit: COMMIT; step s2-update-node-1: <... completed> -error in steps s1-commit s2-update-node-1: ERROR: tuple concurrently updated +?column? + +1 step s2-abort: ABORT; diff --git a/src/test/regress/expected/multi_mx_master_update_node.out b/src/test/regress/expected/multi_mx_master_update_node.out index d8ab68b01..d7f687186 100644 --- a/src/test/regress/expected/multi_mx_master_update_node.out +++ b/src/test/regress/expected/multi_mx_master_update_node.out @@ -233,11 +233,9 @@ SELECT mark_node_readonly('localhost', :worker_2_port, TRUE); t (1 row) --- Now updating the other node should try syncing to worker 2, but instead of --- failure, it should just warn and mark the readonly node as not synced. +-- Now updating the other node will mark worker 2 as not synced. +BEGIN; SELECT 1 FROM master_update_node(:nodeid_1, 'localhost', 12345); -WARNING: cannot execute DELETE in a read-only transaction -CONTEXT: while executing command on localhost:57638 ?column? ---------- 1 @@ -250,6 +248,7 @@ SELECT nodeid, hasmetadata, metadatasynced FROM pg_dist_node ORDER BY nodeid; 3 | t | f (2 rows) +COMMIT; -- worker_2 is out of sync, so further updates aren't sent to it and -- we shouldn't see the warnings. SELECT 1 FROM master_update_node(:nodeid_1, 'localhost', 23456); @@ -285,13 +284,9 @@ SELECT mark_node_readonly('localhost', :worker_2_port, TRUE); t (1 row) --- Revert the nodeport of worker 1, metadata propagation to worker 2 should --- still fail, but after the failure, we should still be able to read from --- worker 2 in the same transaction! +-- Revert the nodeport of worker 1. BEGIN; SELECT 1 FROM master_update_node(:nodeid_1, 'localhost', :worker_1_port); -WARNING: cannot execute DELETE in a read-only transaction -CONTEXT: while executing command on localhost:57638 ?column? ---------- 1 @@ -354,6 +349,60 @@ SELECT verify_metadata('localhost', :worker_1_port), t | t (1 row) +-------------------------------------------------------------------------- +-- Test that master_update_node can appear in a prepared transaction. +-------------------------------------------------------------------------- +BEGIN; +SELECT 1 FROM master_update_node(:nodeid_1, 'localhost', 12345); + ?column? +---------- + 1 +(1 row) + +PREPARE TRANSACTION 'tx01'; +COMMIT PREPARED 'tx01'; +SELECT wait_until_metadata_sync(); + wait_until_metadata_sync +-------------------------- + +(1 row) + +SELECT nodeid, hasmetadata, metadatasynced FROM pg_dist_node ORDER BY nodeid; + nodeid | hasmetadata | metadatasynced +--------+-------------+---------------- + 2 | t | f + 3 | t | t +(2 rows) + +BEGIN; +SELECT 1 FROM master_update_node(:nodeid_1, 'localhost', :worker_1_port); + ?column? +---------- + 1 +(1 row) + +PREPARE TRANSACTION 'tx01'; +COMMIT PREPARED 'tx01'; +SELECT wait_until_metadata_sync(); + wait_until_metadata_sync +-------------------------- + +(1 row) + +SELECT nodeid, hasmetadata, metadatasynced FROM pg_dist_node ORDER BY nodeid; + nodeid | hasmetadata | metadatasynced +--------+-------------+---------------- + 2 | t | t + 3 | t | t +(2 rows) + +SELECT verify_metadata('localhost', :worker_1_port), + verify_metadata('localhost', :worker_2_port); + verify_metadata | verify_metadata +-----------------+----------------- + t | t +(1 row) + -- cleanup DROP TABLE dist_table_1, ref_table, dist_table_2; TRUNCATE pg_dist_colocation; diff --git a/src/test/regress/sql/multi_mx_master_update_node.sql b/src/test/regress/sql/multi_mx_master_update_node.sql index e11891d23..6b0afde97 100644 --- a/src/test/regress/sql/multi_mx_master_update_node.sql +++ b/src/test/regress/sql/multi_mx_master_update_node.sql @@ -131,10 +131,11 @@ INSERT INTO dist_table_2 SELECT i FROM generate_series(1, 100) i; SELECT mark_node_readonly('localhost', :worker_2_port, TRUE); --- Now updating the other node should try syncing to worker 2, but instead of --- failure, it should just warn and mark the readonly node as not synced. +-- Now updating the other node will mark worker 2 as not synced. +BEGIN; SELECT 1 FROM master_update_node(:nodeid_1, 'localhost', 12345); SELECT nodeid, hasmetadata, metadatasynced FROM pg_dist_node ORDER BY nodeid; +COMMIT; -- worker_2 is out of sync, so further updates aren't sent to it and -- we shouldn't see the warnings. @@ -148,9 +149,7 @@ SELECT wait_until_metadata_sync(); -- Mark the node readonly again, so the following master_update_node warns SELECT mark_node_readonly('localhost', :worker_2_port, TRUE); --- Revert the nodeport of worker 1, metadata propagation to worker 2 should --- still fail, but after the failure, we should still be able to read from --- worker 2 in the same transaction! +-- Revert the nodeport of worker 1. BEGIN; SELECT 1 FROM master_update_node(:nodeid_1, 'localhost', :worker_1_port); SELECT count(*) FROM dist_table_2; @@ -173,6 +172,28 @@ BEGIN; SELECT 1 FROM master_update_node(:nodeid_1, 'localhost', 12345); ROLLBACK; +SELECT verify_metadata('localhost', :worker_1_port), + verify_metadata('localhost', :worker_2_port); + +-------------------------------------------------------------------------- +-- Test that master_update_node can appear in a prepared transaction. +-------------------------------------------------------------------------- +BEGIN; +SELECT 1 FROM master_update_node(:nodeid_1, 'localhost', 12345); +PREPARE TRANSACTION 'tx01'; +COMMIT PREPARED 'tx01'; + +SELECT wait_until_metadata_sync(); +SELECT nodeid, hasmetadata, metadatasynced FROM pg_dist_node ORDER BY nodeid; + +BEGIN; +SELECT 1 FROM master_update_node(:nodeid_1, 'localhost', :worker_1_port); +PREPARE TRANSACTION 'tx01'; +COMMIT PREPARED 'tx01'; + +SELECT wait_until_metadata_sync(); +SELECT nodeid, hasmetadata, metadatasynced FROM pg_dist_node ORDER BY nodeid; + SELECT verify_metadata('localhost', :worker_1_port), verify_metadata('localhost', :worker_2_port);