Merge pull request #5597 from citusdata/move_placement_deletions

Move placement deletion from disable node to activate node
pull/5579/head
Önder Kalacı 2022-01-07 10:03:35 +01:00 committed by GitHub
commit b6cf5a969b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 98 additions and 53 deletions

View File

@ -496,16 +496,6 @@ citus_disable_node(PG_FUNCTION_ARGS)
workerNode->workerName,
nodePort)));
}
/*
* Delete replicated table placements from the coordinator's metadata,
* but not remotely. That is because one more more of the remote
* nodes might be down. Instead, we let the background worker
* to sync the metadata when possible.
*/
bool forceRemoteDelete = false;
DeleteAllReplicatedTablePlacementsFromNodeGroup(workerNode->groupId,
forceRemoteDelete);
}
TransactionModifiedNodeMetadata = true;
@ -515,6 +505,12 @@ citus_disable_node(PG_FUNCTION_ARGS)
* active nodes get the metadata updates. We defer this operation to the
* background worker to make it possible disabling nodes when multiple nodes
* are down.
*
* Note that the active placements reside on the active nodes. Hence, when
* Citus finds active placements, it filters out the placements that are on
* the disabled nodes. That's why, we don't have to change/sync placement
* metadata at this point. Instead, we defer that to citus_activate_node()
* where we expect all nodes up and running.
*/
if (UnsetMetadataSyncedForAll())
{
@ -868,6 +864,22 @@ ActivateNode(char *nodeName, int nodePort)
ereport(ERROR, (errmsg("node at \"%s:%u\" does not exist", nodeName, nodePort)));
}
/*
* Delete existing reference and replicated table placements on the
* given groupId if the group has been disabled earlier (e.g., isActive
* set to false).
*
* Sync the metadata changes to all existing metadata nodes irrespective
* of the current nodes' metadata sync state. We expect all nodes up
* and running when another node is activated.
*/
if (!workerNode->isActive && NodeIsPrimary(workerNode))
{
bool localOnly = false;
DeleteAllReplicatedTablePlacementsFromNodeGroup(workerNode->groupId,
localOnly);
}
workerNode =
SetWorkerColumnLocalOnly(workerNode, Anum_pg_dist_node_isactive,
BoolGetDatum(isActive));
@ -1353,9 +1365,9 @@ RemoveNodeFromCluster(char *nodeName, int32 nodePort)
* Delete reference table placements so they are not taken into account
* for the check if there are placements after this.
*/
bool forceRemoteDelete = true;
bool localOnly = false;
DeleteAllReplicatedTablePlacementsFromNodeGroup(workerNode->groupId,
forceRemoteDelete);
localOnly);
/*
* Secondary nodes are read-only, never 2PC is used.

View File

@ -432,7 +432,7 @@ CreateReferenceTableColocationId()
* all placements from pg_dist_placement table for given group.
*/
void
DeleteAllReplicatedTablePlacementsFromNodeGroup(int32 groupId, bool forceRemoteDelete)
DeleteAllReplicatedTablePlacementsFromNodeGroup(int32 groupId, bool localOnly)
{
List *referenceTableList = CitusTableTypeIdList(REFERENCE_TABLE);
List *replicatedMetadataSyncedDistributedTableList =
@ -469,7 +469,7 @@ DeleteAllReplicatedTablePlacementsFromNodeGroup(int32 groupId, bool forceRemoteD
DeleteShardPlacementRow(placement->placementId);
if (forceRemoteDelete)
if (!localOnly)
{
resetStringInfo(deletePlacementCommand);
appendStringInfo(deletePlacementCommand,

View File

@ -21,8 +21,8 @@
extern void EnsureReferenceTablesExistOnAllNodes(void);
extern void EnsureReferenceTablesExistOnAllNodesExtended(char transferMode);
extern uint32 CreateReferenceTableColocationId(void);
extern void DeleteAllReplicatedTablePlacementsFromNodeGroup(int32 groupId, bool
forceRemoteDelete);
extern void DeleteAllReplicatedTablePlacementsFromNodeGroup(int32 groupId,
bool localOnly);
extern int CompareOids(const void *leftElement, const void *rightElement);
extern int ReferenceTableReplicationFactor(void);
extern void ReplicateAllReferenceTablesToNode(char *nodeName, int nodePort);

View File

@ -74,8 +74,8 @@ ORDER BY 1, 2;
(1 row)
SELECT shardid, shardstate
FROM pg_dist_placement p JOIN pg_dist_shard s USING (shardid)
WHERE s.logicalrelid = 'user_table'::regclass
FROM pg_dist_placement p JOIN pg_dist_shard s USING (shardid) JOIN pg_dist_node n USING(groupid)
WHERE s.logicalrelid = 'user_table'::regclass AND n.isactive
ORDER BY placementid;
shardid | shardstate
---------------------------------------------------------------------
@ -91,8 +91,8 @@ ORDER BY 1, 2;
(1 row)
SELECT shardid, shardstate
FROM pg_dist_placement p JOIN pg_dist_shard s USING (shardid)
WHERE s.logicalrelid = 'user_table'::regclass
FROM pg_dist_placement p JOIN pg_dist_shard s USING (shardid) JOIN pg_dist_node n USING(groupid)
WHERE s.logicalrelid = 'user_table'::regclass AND n.isactive
ORDER BY placementid;
shardid | shardstate
---------------------------------------------------------------------
@ -120,8 +120,8 @@ ORDER BY 1, 2;
(1 row)
SELECT shardid, shardstate
FROM pg_dist_placement p JOIN pg_dist_shard s USING (shardid)
WHERE s.logicalrelid = 'user_table'::regclass
FROM pg_dist_placement p JOIN pg_dist_shard s USING (shardid) JOIN pg_dist_node n USING(groupid)
WHERE s.logicalrelid = 'user_table'::regclass AND n.isactive
ORDER BY placementid;
shardid | shardstate
---------------------------------------------------------------------
@ -157,8 +157,8 @@ ORDER BY 1, 2;
(1 row)
SELECT shardid, shardstate
FROM pg_dist_placement p JOIN pg_dist_shard s USING (shardid)
WHERE s.logicalrelid = 'user_table'::regclass
FROM pg_dist_placement p JOIN pg_dist_shard s USING (shardid) JOIN pg_dist_node n USING(groupid)
WHERE s.logicalrelid = 'user_table'::regclass AND n.isactive
ORDER BY placementid;
shardid | shardstate
---------------------------------------------------------------------
@ -181,8 +181,8 @@ SELECT master_remove_node('localhost', :worker_2_proxy_port);
(1 row)
SELECT shardid, shardstate
FROM pg_dist_placement p JOIN pg_dist_shard s USING (shardid)
WHERE s.logicalrelid = 'user_table'::regclass
FROM pg_dist_placement p JOIN pg_dist_shard s USING (shardid) JOIN pg_dist_node n USING(groupid)
WHERE s.logicalrelid = 'user_table'::regclass AND n.isactive
ORDER BY placementid;
shardid | shardstate
---------------------------------------------------------------------
@ -212,8 +212,8 @@ ORDER BY 1, 2;
(2 rows)
SELECT shardid, shardstate
FROM pg_dist_placement p JOIN pg_dist_shard s USING (shardid)
WHERE s.logicalrelid = 'user_table'::regclass
FROM pg_dist_placement p JOIN pg_dist_shard s USING (shardid) JOIN pg_dist_node n USING(groupid)
WHERE s.logicalrelid = 'user_table'::regclass AND n.isactive
ORDER BY placementid;
shardid | shardstate
---------------------------------------------------------------------

View File

@ -510,6 +510,7 @@ ERROR: there is a shard placement in node group 6 but there are no nodes in tha
SELECT * INTO old_placements FROM pg_dist_placement WHERE groupid = :worker_2_group;
DELETE FROM pg_dist_placement WHERE groupid = :worker_2_group;
SELECT master_add_node('localhost', :worker_2_port) AS new_node \gset
WARNING: could not find any shard placements for shardId 1220001
WARNING: citus.enable_object_propagation is off, not creating distributed objects on worker
DETAIL: distributed objects are only kept in sync when citus.enable_object_propagation is set to on. Newly activated nodes will not get these objects created
WARNING: could not find any shard placements for shardId 1220001

View File

@ -641,19 +641,32 @@ SELECT verify_metadata('localhost', :worker_1_port),
---------------------------------------------------------------------
-- Don't drop the reference table so it has shards on the nodes being disabled
DROP TABLE dist_table_1, dist_table_2;
SELECT pg_catalog.citus_disable_node_and_wait('localhost', :worker_2_port);
SELECT pg_catalog.citus_disable_node('localhost', :worker_2_port);
NOTICE: Node localhost:xxxxx has active shard placements. Some queries may fail after this operation. Use SELECT citus_activate_node('localhost', 57638) to activate this node back.
citus_disable_node_and_wait
citus_disable_node
---------------------------------------------------------------------
(1 row)
SELECT verify_metadata('localhost', :worker_1_port);
verify_metadata
SELECT wait_until_metadata_sync(30000);
wait_until_metadata_sync
---------------------------------------------------------------------
t
(1 row)
-- show that node metadata is the same
-- note that we cannot use verify_metadata here
-- because there are several shards/placements
-- in the metadata that are manually modified on the coordinator
-- not on the worker, and pg_catalog.citus_disable_node does
-- not sync the metadata
SELECT result FROM run_command_on_workers($$SELECT jsonb_agg(row_to_json(row(pg_dist_node.*))) FROM pg_dist_node$$) WHERE nodeport=:worker_1_port
EXCEPT
SELECT jsonb_agg(row_to_json(row(pg_dist_node.*)))::text FROM pg_dist_node;
result
---------------------------------------------------------------------
(0 rows)
SELECT 1 FROM master_activate_node('localhost', :worker_2_port);
?column?
---------------------------------------------------------------------

View File

@ -32,16 +32,20 @@ SELECT public.wait_until_metadata_sync();
(1 row)
-- the placement should be removed both from the coordinator
-- the active placement should NOT be removed from the coordinator
-- and from the workers
SELECT count(*) FROM pg_dist_placement WHERE shardid IN (101500, 101501, 101502);
SELECT count(*) FROM pg_dist_placement p JOIN pg_dist_node n USING(groupid)
WHERE n.isactive AND n.noderole = 'primary'
AND p.shardid IN (101500, 101501, 101502);
count
---------------------------------------------------------------------
3
(1 row)
\c - - - :worker_1_port
SELECT count(*) FROM pg_dist_placement WHERE shardid IN (101500, 101501, 101502);
SELECT count(*) FROM pg_dist_placement p JOIN pg_dist_node n USING(groupid)
WHERE n.isactive AND n.noderole = 'primary'
AND p.shardid IN (101500, 101501, 101502);
count
---------------------------------------------------------------------
3

View File

@ -37,8 +37,8 @@ SELECT * FROM master_get_active_worker_nodes()
ORDER BY 1, 2;
SELECT shardid, shardstate
FROM pg_dist_placement p JOIN pg_dist_shard s USING (shardid)
WHERE s.logicalrelid = 'user_table'::regclass
FROM pg_dist_placement p JOIN pg_dist_shard s USING (shardid) JOIN pg_dist_node n USING(groupid)
WHERE s.logicalrelid = 'user_table'::regclass AND n.isactive
ORDER BY placementid;
-- verify node is not activated
@ -46,8 +46,8 @@ SELECT * FROM master_get_active_worker_nodes()
ORDER BY 1, 2;
SELECT shardid, shardstate
FROM pg_dist_placement p JOIN pg_dist_shard s USING (shardid)
WHERE s.logicalrelid = 'user_table'::regclass
FROM pg_dist_placement p JOIN pg_dist_shard s USING (shardid) JOIN pg_dist_node n USING(groupid)
WHERE s.logicalrelid = 'user_table'::regclass AND n.isactive
ORDER BY placementid;
-- fail create schema command
@ -60,8 +60,8 @@ SELECT * FROM master_get_active_worker_nodes()
ORDER BY 1, 2;
SELECT shardid, shardstate
FROM pg_dist_placement p JOIN pg_dist_shard s USING (shardid)
WHERE s.logicalrelid = 'user_table'::regclass
FROM pg_dist_placement p JOIN pg_dist_shard s USING (shardid) JOIN pg_dist_node n USING(groupid)
WHERE s.logicalrelid = 'user_table'::regclass AND n.isactive
ORDER BY placementid;
BEGIN;
@ -81,8 +81,8 @@ SELECT * FROM master_get_active_worker_nodes()
ORDER BY 1, 2;
SELECT shardid, shardstate
FROM pg_dist_placement p JOIN pg_dist_shard s USING (shardid)
WHERE s.logicalrelid = 'user_table'::regclass
FROM pg_dist_placement p JOIN pg_dist_shard s USING (shardid) JOIN pg_dist_node n USING(groupid)
WHERE s.logicalrelid = 'user_table'::regclass AND n.isactive
ORDER BY placementid;
-- test master_add_inactive_node
@ -93,8 +93,8 @@ SELECT master_add_inactive_node('localhost', :worker_2_proxy_port);
SELECT master_remove_node('localhost', :worker_2_proxy_port);
SELECT shardid, shardstate
FROM pg_dist_placement p JOIN pg_dist_shard s USING (shardid)
WHERE s.logicalrelid = 'user_table'::regclass
FROM pg_dist_placement p JOIN pg_dist_shard s USING (shardid) JOIN pg_dist_node n USING(groupid)
WHERE s.logicalrelid = 'user_table'::regclass AND n.isactive
ORDER BY placementid;
-- reset cluster to original state
@ -106,8 +106,8 @@ SELECT * FROM master_get_active_worker_nodes()
ORDER BY 1, 2;
SELECT shardid, shardstate
FROM pg_dist_placement p JOIN pg_dist_shard s USING (shardid)
WHERE s.logicalrelid = 'user_table'::regclass
FROM pg_dist_placement p JOIN pg_dist_shard s USING (shardid) JOIN pg_dist_node n USING(groupid)
WHERE s.logicalrelid = 'user_table'::regclass AND n.isactive
ORDER BY placementid;
SELECT citus.mitmproxy('conn.allow()');

View File

@ -286,8 +286,18 @@ SELECT verify_metadata('localhost', :worker_1_port),
-- Don't drop the reference table so it has shards on the nodes being disabled
DROP TABLE dist_table_1, dist_table_2;
SELECT pg_catalog.citus_disable_node_and_wait('localhost', :worker_2_port);
SELECT verify_metadata('localhost', :worker_1_port);
SELECT pg_catalog.citus_disable_node('localhost', :worker_2_port);
SELECT wait_until_metadata_sync(30000);
-- show that node metadata is the same
-- note that we cannot use verify_metadata here
-- because there are several shards/placements
-- in the metadata that are manually modified on the coordinator
-- not on the worker, and pg_catalog.citus_disable_node does
-- not sync the metadata
SELECT result FROM run_command_on_workers($$SELECT jsonb_agg(row_to_json(row(pg_dist_node.*))) FROM pg_dist_node$$) WHERE nodeport=:worker_1_port
EXCEPT
SELECT jsonb_agg(row_to_json(row(pg_dist_node.*)))::text FROM pg_dist_node;
SELECT 1 FROM master_activate_node('localhost', :worker_2_port);
SELECT verify_metadata('localhost', :worker_1_port);

View File

@ -18,12 +18,17 @@ INSERT INTO ref SELECT i,i FROM generate_series(0,10)i;
SELECT citus_disable_node('localhost', :worker_2_port, true);
SELECT public.wait_until_metadata_sync();
-- the placement should be removed both from the coordinator
-- the active placement should NOT be removed from the coordinator
-- and from the workers
SELECT count(*) FROM pg_dist_placement WHERE shardid IN (101500, 101501, 101502);
SELECT count(*) FROM pg_dist_placement p JOIN pg_dist_node n USING(groupid)
WHERE n.isactive AND n.noderole = 'primary'
AND p.shardid IN (101500, 101501, 101502);
\c - - - :worker_1_port
SELECT count(*) FROM pg_dist_placement WHERE shardid IN (101500, 101501, 101502);
SELECT count(*) FROM pg_dist_placement p JOIN pg_dist_node n USING(groupid)
WHERE n.isactive AND n.noderole = 'primary'
AND p.shardid IN (101500, 101501, 101502);
SET search_path TO disable_node_with_replicated_tables;
-- should be able to ingest data from both the worker and the coordinator