Move placement deletion from disable node to activate node

We prefer the background daemon to only sync node metadata. That's
why we move placement metadata changes from disable node to
activate node. With that, we can make sure that disable node
only changes node metadata, whereas activate node syncs all
the metadata changes. In essence, we already expect all
nodes to be up when a node is activated. So, this does not change
the behavior much.
pull/5597/head
Onder Kalaci 2022-01-05 13:08:23 +01:00
parent 9edfbe7718
commit 9f2d9e1487
10 changed files with 98 additions and 53 deletions

View File

@ -496,16 +496,6 @@ citus_disable_node(PG_FUNCTION_ARGS)
workerNode->workerName, workerNode->workerName,
nodePort))); nodePort)));
} }
/*
* Delete replicated table placements from the coordinator's metadata,
* but not remotely. That is because one more more of the remote
* nodes might be down. Instead, we let the background worker
* to sync the metadata when possible.
*/
bool forceRemoteDelete = false;
DeleteAllReplicatedTablePlacementsFromNodeGroup(workerNode->groupId,
forceRemoteDelete);
} }
TransactionModifiedNodeMetadata = true; TransactionModifiedNodeMetadata = true;
@ -515,6 +505,12 @@ citus_disable_node(PG_FUNCTION_ARGS)
* active nodes get the metadata updates. We defer this operation to the * active nodes get the metadata updates. We defer this operation to the
* background worker to make it possible disabling nodes when multiple nodes * background worker to make it possible disabling nodes when multiple nodes
* are down. * are down.
*
* Note that the active placements reside on the active nodes. Hence, when
* Citus finds active placements, it filters out the placements that are on
* the disabled nodes. That's why, we don't have to change/sync placement
* metadata at this point. Instead, we defer that to citus_activate_node()
* where we expect all nodes up and running.
*/ */
if (UnsetMetadataSyncedForAll()) if (UnsetMetadataSyncedForAll())
{ {
@ -868,6 +864,22 @@ ActivateNode(char *nodeName, int nodePort)
ereport(ERROR, (errmsg("node at \"%s:%u\" does not exist", nodeName, nodePort))); ereport(ERROR, (errmsg("node at \"%s:%u\" does not exist", nodeName, nodePort)));
} }
/*
* Delete existing reference and replicated table placements on the
* given groupId if the group has been disabled earlier (e.g., isActive
* set to false).
*
* Sync the metadata changes to all existing metadata nodes irrespective
* of the current nodes' metadata sync state. We expect all nodes up
* and running when another node is activated.
*/
if (!workerNode->isActive && NodeIsPrimary(workerNode))
{
bool localOnly = false;
DeleteAllReplicatedTablePlacementsFromNodeGroup(workerNode->groupId,
localOnly);
}
workerNode = workerNode =
SetWorkerColumnLocalOnly(workerNode, Anum_pg_dist_node_isactive, SetWorkerColumnLocalOnly(workerNode, Anum_pg_dist_node_isactive,
BoolGetDatum(isActive)); BoolGetDatum(isActive));
@ -1353,9 +1365,9 @@ RemoveNodeFromCluster(char *nodeName, int32 nodePort)
* Delete reference table placements so they are not taken into account * Delete reference table placements so they are not taken into account
* for the check if there are placements after this. * for the check if there are placements after this.
*/ */
bool forceRemoteDelete = true; bool localOnly = false;
DeleteAllReplicatedTablePlacementsFromNodeGroup(workerNode->groupId, DeleteAllReplicatedTablePlacementsFromNodeGroup(workerNode->groupId,
forceRemoteDelete); localOnly);
/* /*
* Secondary nodes are read-only, never 2PC is used. * Secondary nodes are read-only, never 2PC is used.

View File

@ -432,7 +432,7 @@ CreateReferenceTableColocationId()
* all placements from pg_dist_placement table for given group. * all placements from pg_dist_placement table for given group.
*/ */
void void
DeleteAllReplicatedTablePlacementsFromNodeGroup(int32 groupId, bool forceRemoteDelete) DeleteAllReplicatedTablePlacementsFromNodeGroup(int32 groupId, bool localOnly)
{ {
List *referenceTableList = CitusTableTypeIdList(REFERENCE_TABLE); List *referenceTableList = CitusTableTypeIdList(REFERENCE_TABLE);
List *replicatedMetadataSyncedDistributedTableList = List *replicatedMetadataSyncedDistributedTableList =
@ -469,7 +469,7 @@ DeleteAllReplicatedTablePlacementsFromNodeGroup(int32 groupId, bool forceRemoteD
DeleteShardPlacementRow(placement->placementId); DeleteShardPlacementRow(placement->placementId);
if (forceRemoteDelete) if (!localOnly)
{ {
resetStringInfo(deletePlacementCommand); resetStringInfo(deletePlacementCommand);
appendStringInfo(deletePlacementCommand, appendStringInfo(deletePlacementCommand,

View File

@ -21,8 +21,8 @@
extern void EnsureReferenceTablesExistOnAllNodes(void); extern void EnsureReferenceTablesExistOnAllNodes(void);
extern void EnsureReferenceTablesExistOnAllNodesExtended(char transferMode); extern void EnsureReferenceTablesExistOnAllNodesExtended(char transferMode);
extern uint32 CreateReferenceTableColocationId(void); extern uint32 CreateReferenceTableColocationId(void);
extern void DeleteAllReplicatedTablePlacementsFromNodeGroup(int32 groupId, bool extern void DeleteAllReplicatedTablePlacementsFromNodeGroup(int32 groupId,
forceRemoteDelete); bool localOnly);
extern int CompareOids(const void *leftElement, const void *rightElement); extern int CompareOids(const void *leftElement, const void *rightElement);
extern int ReferenceTableReplicationFactor(void); extern int ReferenceTableReplicationFactor(void);
extern void ReplicateAllReferenceTablesToNode(char *nodeName, int nodePort); extern void ReplicateAllReferenceTablesToNode(char *nodeName, int nodePort);

View File

@ -74,8 +74,8 @@ ORDER BY 1, 2;
(1 row) (1 row)
SELECT shardid, shardstate SELECT shardid, shardstate
FROM pg_dist_placement p JOIN pg_dist_shard s USING (shardid) FROM pg_dist_placement p JOIN pg_dist_shard s USING (shardid) JOIN pg_dist_node n USING(groupid)
WHERE s.logicalrelid = 'user_table'::regclass WHERE s.logicalrelid = 'user_table'::regclass AND n.isactive
ORDER BY placementid; ORDER BY placementid;
shardid | shardstate shardid | shardstate
--------------------------------------------------------------------- ---------------------------------------------------------------------
@ -91,8 +91,8 @@ ORDER BY 1, 2;
(1 row) (1 row)
SELECT shardid, shardstate SELECT shardid, shardstate
FROM pg_dist_placement p JOIN pg_dist_shard s USING (shardid) FROM pg_dist_placement p JOIN pg_dist_shard s USING (shardid) JOIN pg_dist_node n USING(groupid)
WHERE s.logicalrelid = 'user_table'::regclass WHERE s.logicalrelid = 'user_table'::regclass AND n.isactive
ORDER BY placementid; ORDER BY placementid;
shardid | shardstate shardid | shardstate
--------------------------------------------------------------------- ---------------------------------------------------------------------
@ -120,8 +120,8 @@ ORDER BY 1, 2;
(1 row) (1 row)
SELECT shardid, shardstate SELECT shardid, shardstate
FROM pg_dist_placement p JOIN pg_dist_shard s USING (shardid) FROM pg_dist_placement p JOIN pg_dist_shard s USING (shardid) JOIN pg_dist_node n USING(groupid)
WHERE s.logicalrelid = 'user_table'::regclass WHERE s.logicalrelid = 'user_table'::regclass AND n.isactive
ORDER BY placementid; ORDER BY placementid;
shardid | shardstate shardid | shardstate
--------------------------------------------------------------------- ---------------------------------------------------------------------
@ -157,8 +157,8 @@ ORDER BY 1, 2;
(1 row) (1 row)
SELECT shardid, shardstate SELECT shardid, shardstate
FROM pg_dist_placement p JOIN pg_dist_shard s USING (shardid) FROM pg_dist_placement p JOIN pg_dist_shard s USING (shardid) JOIN pg_dist_node n USING(groupid)
WHERE s.logicalrelid = 'user_table'::regclass WHERE s.logicalrelid = 'user_table'::regclass AND n.isactive
ORDER BY placementid; ORDER BY placementid;
shardid | shardstate shardid | shardstate
--------------------------------------------------------------------- ---------------------------------------------------------------------
@ -181,8 +181,8 @@ SELECT master_remove_node('localhost', :worker_2_proxy_port);
(1 row) (1 row)
SELECT shardid, shardstate SELECT shardid, shardstate
FROM pg_dist_placement p JOIN pg_dist_shard s USING (shardid) FROM pg_dist_placement p JOIN pg_dist_shard s USING (shardid) JOIN pg_dist_node n USING(groupid)
WHERE s.logicalrelid = 'user_table'::regclass WHERE s.logicalrelid = 'user_table'::regclass AND n.isactive
ORDER BY placementid; ORDER BY placementid;
shardid | shardstate shardid | shardstate
--------------------------------------------------------------------- ---------------------------------------------------------------------
@ -212,8 +212,8 @@ ORDER BY 1, 2;
(2 rows) (2 rows)
SELECT shardid, shardstate SELECT shardid, shardstate
FROM pg_dist_placement p JOIN pg_dist_shard s USING (shardid) FROM pg_dist_placement p JOIN pg_dist_shard s USING (shardid) JOIN pg_dist_node n USING(groupid)
WHERE s.logicalrelid = 'user_table'::regclass WHERE s.logicalrelid = 'user_table'::regclass AND n.isactive
ORDER BY placementid; ORDER BY placementid;
shardid | shardstate shardid | shardstate
--------------------------------------------------------------------- ---------------------------------------------------------------------

View File

@ -510,6 +510,7 @@ ERROR: there is a shard placement in node group 6 but there are no nodes in tha
SELECT * INTO old_placements FROM pg_dist_placement WHERE groupid = :worker_2_group; SELECT * INTO old_placements FROM pg_dist_placement WHERE groupid = :worker_2_group;
DELETE FROM pg_dist_placement WHERE groupid = :worker_2_group; DELETE FROM pg_dist_placement WHERE groupid = :worker_2_group;
SELECT master_add_node('localhost', :worker_2_port) AS new_node \gset SELECT master_add_node('localhost', :worker_2_port) AS new_node \gset
WARNING: could not find any shard placements for shardId 1220001
WARNING: citus.enable_object_propagation is off, not creating distributed objects on worker WARNING: citus.enable_object_propagation is off, not creating distributed objects on worker
DETAIL: distributed objects are only kept in sync when citus.enable_object_propagation is set to on. Newly activated nodes will not get these objects created DETAIL: distributed objects are only kept in sync when citus.enable_object_propagation is set to on. Newly activated nodes will not get these objects created
WARNING: could not find any shard placements for shardId 1220001 WARNING: could not find any shard placements for shardId 1220001

View File

@ -641,19 +641,32 @@ SELECT verify_metadata('localhost', :worker_1_port),
--------------------------------------------------------------------- ---------------------------------------------------------------------
-- Don't drop the reference table so it has shards on the nodes being disabled -- Don't drop the reference table so it has shards on the nodes being disabled
DROP TABLE dist_table_1, dist_table_2; DROP TABLE dist_table_1, dist_table_2;
SELECT pg_catalog.citus_disable_node_and_wait('localhost', :worker_2_port); SELECT pg_catalog.citus_disable_node('localhost', :worker_2_port);
NOTICE: Node localhost:xxxxx has active shard placements. Some queries may fail after this operation. Use SELECT citus_activate_node('localhost', 57638) to activate this node back. NOTICE: Node localhost:xxxxx has active shard placements. Some queries may fail after this operation. Use SELECT citus_activate_node('localhost', 57638) to activate this node back.
citus_disable_node_and_wait citus_disable_node
--------------------------------------------------------------------- ---------------------------------------------------------------------
(1 row) (1 row)
SELECT verify_metadata('localhost', :worker_1_port); SELECT wait_until_metadata_sync(30000);
verify_metadata wait_until_metadata_sync
--------------------------------------------------------------------- ---------------------------------------------------------------------
t
(1 row) (1 row)
-- show that node metadata is the same
-- note that we cannot use verify_metadata here
-- because there are several shards/placements
-- in the metadata that are manually modified on the coordinator
-- not on the worker, and pg_catalog.citus_disable_node does
-- not sync the metadata
SELECT result FROM run_command_on_workers($$SELECT jsonb_agg(row_to_json(row(pg_dist_node.*))) FROM pg_dist_node$$) WHERE nodeport=:worker_1_port
EXCEPT
SELECT jsonb_agg(row_to_json(row(pg_dist_node.*)))::text FROM pg_dist_node;
result
---------------------------------------------------------------------
(0 rows)
SELECT 1 FROM master_activate_node('localhost', :worker_2_port); SELECT 1 FROM master_activate_node('localhost', :worker_2_port);
?column? ?column?
--------------------------------------------------------------------- ---------------------------------------------------------------------

View File

@ -32,16 +32,20 @@ SELECT public.wait_until_metadata_sync();
(1 row) (1 row)
-- the placement should be removed both from the coordinator -- the active placement should NOT be removed from the coordinator
-- and from the workers -- and from the workers
SELECT count(*) FROM pg_dist_placement WHERE shardid IN (101500, 101501, 101502); SELECT count(*) FROM pg_dist_placement p JOIN pg_dist_node n USING(groupid)
WHERE n.isactive AND n.noderole = 'primary'
AND p.shardid IN (101500, 101501, 101502);
count count
--------------------------------------------------------------------- ---------------------------------------------------------------------
3 3
(1 row) (1 row)
\c - - - :worker_1_port \c - - - :worker_1_port
SELECT count(*) FROM pg_dist_placement WHERE shardid IN (101500, 101501, 101502); SELECT count(*) FROM pg_dist_placement p JOIN pg_dist_node n USING(groupid)
WHERE n.isactive AND n.noderole = 'primary'
AND p.shardid IN (101500, 101501, 101502);
count count
--------------------------------------------------------------------- ---------------------------------------------------------------------
3 3

View File

@ -37,8 +37,8 @@ SELECT * FROM master_get_active_worker_nodes()
ORDER BY 1, 2; ORDER BY 1, 2;
SELECT shardid, shardstate SELECT shardid, shardstate
FROM pg_dist_placement p JOIN pg_dist_shard s USING (shardid) FROM pg_dist_placement p JOIN pg_dist_shard s USING (shardid) JOIN pg_dist_node n USING(groupid)
WHERE s.logicalrelid = 'user_table'::regclass WHERE s.logicalrelid = 'user_table'::regclass AND n.isactive
ORDER BY placementid; ORDER BY placementid;
-- verify node is not activated -- verify node is not activated
@ -46,8 +46,8 @@ SELECT * FROM master_get_active_worker_nodes()
ORDER BY 1, 2; ORDER BY 1, 2;
SELECT shardid, shardstate SELECT shardid, shardstate
FROM pg_dist_placement p JOIN pg_dist_shard s USING (shardid) FROM pg_dist_placement p JOIN pg_dist_shard s USING (shardid) JOIN pg_dist_node n USING(groupid)
WHERE s.logicalrelid = 'user_table'::regclass WHERE s.logicalrelid = 'user_table'::regclass AND n.isactive
ORDER BY placementid; ORDER BY placementid;
-- fail create schema command -- fail create schema command
@ -60,8 +60,8 @@ SELECT * FROM master_get_active_worker_nodes()
ORDER BY 1, 2; ORDER BY 1, 2;
SELECT shardid, shardstate SELECT shardid, shardstate
FROM pg_dist_placement p JOIN pg_dist_shard s USING (shardid) FROM pg_dist_placement p JOIN pg_dist_shard s USING (shardid) JOIN pg_dist_node n USING(groupid)
WHERE s.logicalrelid = 'user_table'::regclass WHERE s.logicalrelid = 'user_table'::regclass AND n.isactive
ORDER BY placementid; ORDER BY placementid;
BEGIN; BEGIN;
@ -81,8 +81,8 @@ SELECT * FROM master_get_active_worker_nodes()
ORDER BY 1, 2; ORDER BY 1, 2;
SELECT shardid, shardstate SELECT shardid, shardstate
FROM pg_dist_placement p JOIN pg_dist_shard s USING (shardid) FROM pg_dist_placement p JOIN pg_dist_shard s USING (shardid) JOIN pg_dist_node n USING(groupid)
WHERE s.logicalrelid = 'user_table'::regclass WHERE s.logicalrelid = 'user_table'::regclass AND n.isactive
ORDER BY placementid; ORDER BY placementid;
-- test master_add_inactive_node -- test master_add_inactive_node
@ -93,8 +93,8 @@ SELECT master_add_inactive_node('localhost', :worker_2_proxy_port);
SELECT master_remove_node('localhost', :worker_2_proxy_port); SELECT master_remove_node('localhost', :worker_2_proxy_port);
SELECT shardid, shardstate SELECT shardid, shardstate
FROM pg_dist_placement p JOIN pg_dist_shard s USING (shardid) FROM pg_dist_placement p JOIN pg_dist_shard s USING (shardid) JOIN pg_dist_node n USING(groupid)
WHERE s.logicalrelid = 'user_table'::regclass WHERE s.logicalrelid = 'user_table'::regclass AND n.isactive
ORDER BY placementid; ORDER BY placementid;
-- reset cluster to original state -- reset cluster to original state
@ -106,8 +106,8 @@ SELECT * FROM master_get_active_worker_nodes()
ORDER BY 1, 2; ORDER BY 1, 2;
SELECT shardid, shardstate SELECT shardid, shardstate
FROM pg_dist_placement p JOIN pg_dist_shard s USING (shardid) FROM pg_dist_placement p JOIN pg_dist_shard s USING (shardid) JOIN pg_dist_node n USING(groupid)
WHERE s.logicalrelid = 'user_table'::regclass WHERE s.logicalrelid = 'user_table'::regclass AND n.isactive
ORDER BY placementid; ORDER BY placementid;
SELECT citus.mitmproxy('conn.allow()'); SELECT citus.mitmproxy('conn.allow()');

View File

@ -286,8 +286,18 @@ SELECT verify_metadata('localhost', :worker_1_port),
-- Don't drop the reference table so it has shards on the nodes being disabled -- Don't drop the reference table so it has shards on the nodes being disabled
DROP TABLE dist_table_1, dist_table_2; DROP TABLE dist_table_1, dist_table_2;
SELECT pg_catalog.citus_disable_node_and_wait('localhost', :worker_2_port); SELECT pg_catalog.citus_disable_node('localhost', :worker_2_port);
SELECT verify_metadata('localhost', :worker_1_port); SELECT wait_until_metadata_sync(30000);
-- show that node metadata is the same
-- note that we cannot use verify_metadata here
-- because there are several shards/placements
-- in the metadata that are manually modified on the coordinator
-- not on the worker, and pg_catalog.citus_disable_node does
-- not sync the metadata
SELECT result FROM run_command_on_workers($$SELECT jsonb_agg(row_to_json(row(pg_dist_node.*))) FROM pg_dist_node$$) WHERE nodeport=:worker_1_port
EXCEPT
SELECT jsonb_agg(row_to_json(row(pg_dist_node.*)))::text FROM pg_dist_node;
SELECT 1 FROM master_activate_node('localhost', :worker_2_port); SELECT 1 FROM master_activate_node('localhost', :worker_2_port);
SELECT verify_metadata('localhost', :worker_1_port); SELECT verify_metadata('localhost', :worker_1_port);

View File

@ -18,12 +18,17 @@ INSERT INTO ref SELECT i,i FROM generate_series(0,10)i;
SELECT citus_disable_node('localhost', :worker_2_port, true); SELECT citus_disable_node('localhost', :worker_2_port, true);
SELECT public.wait_until_metadata_sync(); SELECT public.wait_until_metadata_sync();
-- the placement should be removed both from the coordinator -- the active placement should NOT be removed from the coordinator
-- and from the workers -- and from the workers
SELECT count(*) FROM pg_dist_placement WHERE shardid IN (101500, 101501, 101502); SELECT count(*) FROM pg_dist_placement p JOIN pg_dist_node n USING(groupid)
WHERE n.isactive AND n.noderole = 'primary'
AND p.shardid IN (101500, 101501, 101502);
\c - - - :worker_1_port \c - - - :worker_1_port
SELECT count(*) FROM pg_dist_placement WHERE shardid IN (101500, 101501, 101502); SELECT count(*) FROM pg_dist_placement p JOIN pg_dist_node n USING(groupid)
WHERE n.isactive AND n.noderole = 'primary'
AND p.shardid IN (101500, 101501, 101502);
SET search_path TO disable_node_with_replicated_tables; SET search_path TO disable_node_with_replicated_tables;
-- should be able to ingest data from both the worker and the coordinator -- should be able to ingest data from both the worker and the coordinator