Make master_update_node citus-ha friendly.

pull/2928/head
Hadi Moshayedi 2019-09-17 16:36:51 -07:00
parent 76f3933b05
commit d2f2acc4b2
6 changed files with 183 additions and 224 deletions

View File

@ -67,9 +67,6 @@ typedef struct NodeMetadata
} NodeMetadata;
/* local function forward declarations */
static List * WorkerListDelete(List *workerList, uint32 nodeId);
static List * SyncedMetadataNodeList(void);
static void SyncDistNodeEntryToNodes(WorkerNode *workerNode, List *metadataWorkers);
static int ActivateNode(char *nodeName, int nodePort);
static void RemoveNodeFromCluster(char *nodeName, int32 nodePort);
static int AddNodeMetadata(char *nodeName, int32 nodePort, NodeMetadata
@ -84,9 +81,7 @@ static void DeleteNodeRow(char *nodename, int32 nodeport);
static List * ParseWorkerNodeFileAndRename(void);
static WorkerNode * TupleToWorkerNode(TupleDesc tupleDescriptor, HeapTuple heapTuple);
static void UpdateNodeLocation(int32 nodeId, char *newNodeName, int32 newNodePort);
static bool * SendOptionalCommandListToWorkers(List *workerNodeList,
List *commandList,
const char *nodeUser);
static bool UnsetMetadataSyncedForAll(void);
/* declarations for dynamic loading */
PG_FUNCTION_INFO_V1(master_add_node);
@ -515,7 +510,6 @@ master_update_node(PG_FUNCTION_ARGS)
WorkerNode *workerNode = NULL;
WorkerNode *workerNodeWithSameAddress = NULL;
List *placementList = NIL;
List *metadataWorkersToSync = NIL;
BackgroundWorkerHandle *handle = NULL;
CheckCitusVersion(ERROR);
@ -593,25 +587,21 @@ master_update_node(PG_FUNCTION_ARGS)
/*
* Propagate the updated pg_dist_node entry to all metadata workers.
* The usual case is that the new node is in stand-by mode, so we don't
* sync the changes to the new node, and instead schedule it for being
* synced by the maintenance daemon.
* citus-ha uses master_update_node() in a prepared transaction, and
* we don't support coordinated prepared transactions, so we cannot
* propagate the changes to the worker nodes here. Instead we mark
* all metadata nodes as not-synced and ask maintenanced to do the
* propagation.
*
* It is possible that maintenance daemon does the first resync too
* early, but that's fine, since this will start a retry loop with
* 5 second intervals until sync is complete.
*/
metadataWorkersToSync = SyncedMetadataNodeList();
if (workerNode->hasMetadata)
if (UnsetMetadataSyncedForAll())
{
metadataWorkersToSync = WorkerListDelete(metadataWorkersToSync, nodeId);
MarkNodeMetadataSynced(workerNode->workerName,
workerNode->workerPort, false);
TriggerMetadataSync(MyDatabaseId);
}
SyncDistNodeEntryToNodes(workerNode, metadataWorkersToSync);
if (handle != NULL)
{
/*
@ -625,176 +615,6 @@ master_update_node(PG_FUNCTION_ARGS)
}
/* MetadataNodeList returns list of all synced metadata workers. */
static List *
SyncedMetadataNodeList(void)
{
List *metadataNodeList = NIL;
List *activePrimaries = ActivePrimaryNodeList(AccessShareLock);
ListCell *workerNodeCell = NULL;
foreach(workerNodeCell, activePrimaries)
{
WorkerNode *workerNode = lfirst(workerNodeCell);
if (workerNode->hasMetadata && workerNode->metadataSynced)
{
metadataNodeList = lappend(metadataNodeList, workerNode);
}
}
return metadataNodeList;
}
/*
* WorkerListDelete removes the worker node with the given id
* from the list of given workers.
*/
static List *
WorkerListDelete(List *workerList, uint32 nodeId)
{
List *filteredWorkerList = NIL;
ListCell *workerCell = NULL;
foreach(workerCell, workerList)
{
WorkerNode *workerNode = lfirst(workerCell);
if (workerNode->nodeId != nodeId)
{
filteredWorkerList = lappend(filteredWorkerList, workerNode);
}
}
return filteredWorkerList;
}
/*
* SyncDistNodeEntryToNodes synchronizes the corresponding entry for
* the given workerNode in pg_dist_node metadata table of given
* metadataWorkers. If syncing to a node fails, pg_distnode.metadatasynced
* is set to false.
*/
static void
SyncDistNodeEntryToNodes(WorkerNode *nodeToSync, List *metadataWorkers)
{
ListCell *workerCell = NULL;
char *extensionOwner = CitusExtensionOwnerName();
char *nodeDeleteCommand = NodeDeleteCommand(nodeToSync->nodeId);
char *nodeInsertCommand = NodeListInsertCommand(list_make1(nodeToSync));
List *commandList = list_make2(nodeDeleteCommand, nodeInsertCommand);
bool *workerFailed = SendOptionalCommandListToWorkers(metadataWorkers, commandList,
extensionOwner);
int workerIndex = 0;
foreach(workerCell, metadataWorkers)
{
WorkerNode *workerNode = lfirst(workerCell);
if (workerFailed[workerIndex])
{
MarkNodeMetadataSynced(workerNode->workerName, workerNode->workerPort, false);
}
workerIndex++;
}
}
/*
* SendOptionalCommandListToWorkers sends the given command list to the given
* worker list, and returns a bool[] indicating whether execution of the command
* list failed at a worker or not.
*
* If execution fails at a worker because of connection error, this function just
* emits a warning for that node. If execution fails because of a result error,
* the node is rolled-back to the status before this function so further queries
* can be sent to the node.
*/
static bool *
SendOptionalCommandListToWorkers(List *workerNodeList, List *commandList,
const char *nodeUser)
{
List *connectionList = NIL;
ListCell *commandCell = NULL;
ListCell *connectionCell = NULL;
bool *workerFailed = palloc0(sizeof(workerNodeList));
char *beginSavepointCommand = "SAVEPOINT sp_node_metadata";
char *rollbackSavepointCommand = "ROLLBACK TO SAVEPOINT sp_node_metadata";
char *releaseSavepointCommand = "RELEASE SAVEPOINT sp_node_metadata";
BeginOrContinueCoordinatedTransaction();
/* open connections in parallel */
connectionList = StartWorkerListConnections(workerNodeList, 0, nodeUser, NULL);
FinishConnectionListEstablishment(connectionList);
RemoteTransactionsBeginIfNecessary(connectionList);
commandList = lcons(beginSavepointCommand, commandList);
commandList = lappend(commandList, releaseSavepointCommand);
foreach(commandCell, commandList)
{
char *command = lfirst(commandCell);
int workerIndex = 0;
foreach(connectionCell, connectionList)
{
MultiConnection *connection = lfirst(connectionCell);
if (!workerFailed[workerIndex])
{
if (SendRemoteCommand(connection, command) == 0)
{
ReportConnectionError(connection, WARNING);
workerFailed[workerIndex] = true;
}
}
workerIndex++;
}
workerIndex = 0;
foreach(connectionCell, connectionList)
{
MultiConnection *connection = lfirst(connectionCell);
bool raiseInterrupts = true;
PGresult *commandResult = NULL;
bool responseOK = false;
if (workerFailed[workerIndex])
{
workerIndex++;
continue;
}
commandResult = GetRemoteCommandResult(connection, raiseInterrupts);
responseOK = IsResponseOK(commandResult);
if (!responseOK)
{
ReportResultError(connection, commandResult, WARNING);
workerFailed[workerIndex] = true;
PQclear(commandResult);
ForgetResults(connection);
ExecuteOptionalRemoteCommand(connection, rollbackSavepointCommand, NULL);
}
else
{
PQclear(commandResult);
ForgetResults(connection);
}
workerIndex++;
}
}
return workerFailed;
}
static void
UpdateNodeLocation(int32 nodeId, char *newNodeName, int32 newNodePort)
{
@ -1783,3 +1603,69 @@ DatumToString(Datum datum, Oid dataType)
return outputString;
}
/*
* UnsetMetadataSyncedForAll sets the metadatasynced column of all metadata
* nodes to false. It returns true if it updated at least a node.
*/
static bool
UnsetMetadataSyncedForAll(void)
{
bool updatedAtLeastOne = false;
Relation relation = NULL;
SysScanDesc scanDescriptor = NULL;
ScanKeyData scanKey[2];
int scanKeyCount = 2;
bool indexOK = false;
HeapTuple heapTuple = NULL;
TupleDesc tupleDescriptor = NULL;
relation = heap_open(DistNodeRelationId(), ExclusiveLock);
tupleDescriptor = RelationGetDescr(relation);
ScanKeyInit(&scanKey[0], Anum_pg_dist_node_hasmetadata,
BTEqualStrategyNumber, F_BOOLEQ, BoolGetDatum(true));
ScanKeyInit(&scanKey[1], Anum_pg_dist_node_metadatasynced,
BTEqualStrategyNumber, F_BOOLEQ, BoolGetDatum(true));
scanDescriptor = systable_beginscan(relation,
InvalidOid, indexOK,
NULL, scanKeyCount, scanKey);
heapTuple = systable_getnext(scanDescriptor);
if (HeapTupleIsValid(heapTuple))
{
updatedAtLeastOne = true;
}
while (HeapTupleIsValid(heapTuple))
{
HeapTuple newHeapTuple = NULL;
Datum values[Natts_pg_dist_node];
bool isnull[Natts_pg_dist_node];
bool replace[Natts_pg_dist_node];
memset(replace, false, sizeof(replace));
memset(isnull, false, sizeof(isnull));
memset(values, 0, sizeof(values));
values[Anum_pg_dist_node_metadatasynced - 1] = BoolGetDatum(false);
replace[Anum_pg_dist_node_metadatasynced - 1] = true;
newHeapTuple = heap_modify_tuple(heapTuple, tupleDescriptor, values, isnull,
replace);
CatalogTupleUpdate(relation, &newHeapTuple->t_self, newHeapTuple);
CommandCounterIncrement();
heap_freetuple(newHeapTuple);
heapTuple = systable_getnext(scanDescriptor);
}
systable_endscan(scanDescriptor);
heap_close(relation, NoLock);
return updatedAtLeastOne;
}

View File

@ -29,11 +29,11 @@ step detector-dump-wait-edges:
waiting_transaction_numblocking_transaction_numblocking_transaction_waiting
274 273 f
266 265 f
transactionnumberwaitingtransactionnumbers
273
274 273
265
266 265
step s1-abort:
ABORT;
@ -77,14 +77,14 @@ step detector-dump-wait-edges:
waiting_transaction_numblocking_transaction_numblocking_transaction_waiting
278 277 f
279 277 f
279 278 t
270 269 f
271 269 f
271 270 t
transactionnumberwaitingtransactionnumbers
277
278 277
279 277,278
269
270 269
271 269,270
step s1-abort:
ABORT;

View File

@ -29,11 +29,11 @@ step detector-dump-wait-edges:
waiting_transaction_numblocking_transaction_numblocking_transaction_waiting
275 274 f
267 266 f
transactionnumberwaitingtransactionnumbers
274
275 274
266
267 266
step s1-abort:
ABORT;
@ -77,14 +77,14 @@ step detector-dump-wait-edges:
waiting_transaction_numblocking_transaction_numblocking_transaction_waiting
279 278 f
280 278 f
280 279 t
271 270 f
272 270 f
272 271 t
transactionnumberwaitingtransactionnumbers
278
279 278
280 278,279
270
271 270
272 270,271
step s1-abort:
ABORT;

View File

@ -22,13 +22,14 @@ step s2-update-node-2:
(select nodeid from pg_dist_node where nodeport = 57638),
'localhost',
58638);
?column?
1
<waiting ...>
step s1-commit:
COMMIT;
step s2-update-node-2: <... completed>
?column?
1
step s1-show-nodes:
SELECT nodeid, nodename, nodeport, isactive
FROM pg_dist_node
@ -71,7 +72,9 @@ step s1-commit:
COMMIT;
step s2-update-node-1: <... completed>
error in steps s1-commit s2-update-node-1: ERROR: tuple concurrently updated
?column?
1
step s2-abort:
ABORT;

View File

@ -233,11 +233,9 @@ SELECT mark_node_readonly('localhost', :worker_2_port, TRUE);
t
(1 row)
-- Now updating the other node should try syncing to worker 2, but instead of
-- failure, it should just warn and mark the readonly node as not synced.
-- Now updating the other node will mark worker 2 as not synced.
BEGIN;
SELECT 1 FROM master_update_node(:nodeid_1, 'localhost', 12345);
WARNING: cannot execute DELETE in a read-only transaction
CONTEXT: while executing command on localhost:57638
?column?
----------
1
@ -250,6 +248,7 @@ SELECT nodeid, hasmetadata, metadatasynced FROM pg_dist_node ORDER BY nodeid;
3 | t | f
(2 rows)
COMMIT;
-- worker_2 is out of sync, so further updates aren't sent to it and
-- we shouldn't see the warnings.
SELECT 1 FROM master_update_node(:nodeid_1, 'localhost', 23456);
@ -285,13 +284,9 @@ SELECT mark_node_readonly('localhost', :worker_2_port, TRUE);
t
(1 row)
-- Revert the nodeport of worker 1, metadata propagation to worker 2 should
-- still fail, but after the failure, we should still be able to read from
-- worker 2 in the same transaction!
-- Revert the nodeport of worker 1.
BEGIN;
SELECT 1 FROM master_update_node(:nodeid_1, 'localhost', :worker_1_port);
WARNING: cannot execute DELETE in a read-only transaction
CONTEXT: while executing command on localhost:57638
?column?
----------
1
@ -354,6 +349,60 @@ SELECT verify_metadata('localhost', :worker_1_port),
t | t
(1 row)
--------------------------------------------------------------------------
-- Test that master_update_node can appear in a prepared transaction.
--------------------------------------------------------------------------
BEGIN;
SELECT 1 FROM master_update_node(:nodeid_1, 'localhost', 12345);
?column?
----------
1
(1 row)
PREPARE TRANSACTION 'tx01';
COMMIT PREPARED 'tx01';
SELECT wait_until_metadata_sync();
wait_until_metadata_sync
--------------------------
(1 row)
SELECT nodeid, hasmetadata, metadatasynced FROM pg_dist_node ORDER BY nodeid;
nodeid | hasmetadata | metadatasynced
--------+-------------+----------------
2 | t | f
3 | t | t
(2 rows)
BEGIN;
SELECT 1 FROM master_update_node(:nodeid_1, 'localhost', :worker_1_port);
?column?
----------
1
(1 row)
PREPARE TRANSACTION 'tx01';
COMMIT PREPARED 'tx01';
SELECT wait_until_metadata_sync();
wait_until_metadata_sync
--------------------------
(1 row)
SELECT nodeid, hasmetadata, metadatasynced FROM pg_dist_node ORDER BY nodeid;
nodeid | hasmetadata | metadatasynced
--------+-------------+----------------
2 | t | t
3 | t | t
(2 rows)
SELECT verify_metadata('localhost', :worker_1_port),
verify_metadata('localhost', :worker_2_port);
verify_metadata | verify_metadata
-----------------+-----------------
t | t
(1 row)
-- cleanup
DROP TABLE dist_table_1, ref_table, dist_table_2;
TRUNCATE pg_dist_colocation;

View File

@ -131,10 +131,11 @@ INSERT INTO dist_table_2 SELECT i FROM generate_series(1, 100) i;
SELECT mark_node_readonly('localhost', :worker_2_port, TRUE);
-- Now updating the other node should try syncing to worker 2, but instead of
-- failure, it should just warn and mark the readonly node as not synced.
-- Now updating the other node will mark worker 2 as not synced.
BEGIN;
SELECT 1 FROM master_update_node(:nodeid_1, 'localhost', 12345);
SELECT nodeid, hasmetadata, metadatasynced FROM pg_dist_node ORDER BY nodeid;
COMMIT;
-- worker_2 is out of sync, so further updates aren't sent to it and
-- we shouldn't see the warnings.
@ -148,9 +149,7 @@ SELECT wait_until_metadata_sync();
-- Mark the node readonly again, so the following master_update_node warns
SELECT mark_node_readonly('localhost', :worker_2_port, TRUE);
-- Revert the nodeport of worker 1, metadata propagation to worker 2 should
-- still fail, but after the failure, we should still be able to read from
-- worker 2 in the same transaction!
-- Revert the nodeport of worker 1.
BEGIN;
SELECT 1 FROM master_update_node(:nodeid_1, 'localhost', :worker_1_port);
SELECT count(*) FROM dist_table_2;
@ -173,6 +172,28 @@ BEGIN;
SELECT 1 FROM master_update_node(:nodeid_1, 'localhost', 12345);
ROLLBACK;
SELECT verify_metadata('localhost', :worker_1_port),
verify_metadata('localhost', :worker_2_port);
--------------------------------------------------------------------------
-- Test that master_update_node can appear in a prepared transaction.
--------------------------------------------------------------------------
BEGIN;
SELECT 1 FROM master_update_node(:nodeid_1, 'localhost', 12345);
PREPARE TRANSACTION 'tx01';
COMMIT PREPARED 'tx01';
SELECT wait_until_metadata_sync();
SELECT nodeid, hasmetadata, metadatasynced FROM pg_dist_node ORDER BY nodeid;
BEGIN;
SELECT 1 FROM master_update_node(:nodeid_1, 'localhost', :worker_1_port);
PREPARE TRANSACTION 'tx01';
COMMIT PREPARED 'tx01';
SELECT wait_until_metadata_sync();
SELECT nodeid, hasmetadata, metadatasynced FROM pg_dist_node ORDER BY nodeid;
SELECT verify_metadata('localhost', :worker_1_port),
verify_metadata('localhost', :worker_2_port);