From 9f2d9e1487533337ac4e2f126b0fed455d930893 Mon Sep 17 00:00:00 2001 From: Onder Kalaci Date: Wed, 5 Jan 2022 13:08:23 +0100 Subject: [PATCH] Move placement deletion from disable node to activate node We prefer the background daemon to only sync node metadata. That's why we move placement metadata changes from disable node to activate node. With that, we can make sure that disable node only changes node metadata, whereas activate node syncs all the metadata changes. In essence, we already expect all nodes to be up when a node is activated. So, this does not change the behavior much. --- .../distributed/metadata/node_metadata.c | 36 ++++++++++++------- .../distributed/utils/reference_table_utils.c | 4 +-- .../distributed/reference_table_utils.h | 4 +-- .../expected/failure_add_disable_node.out | 24 ++++++------- .../expected/multi_cluster_management.out | 1 + .../expected/multi_mx_node_metadata.out | 23 +++++++++--- .../replicated_table_disable_node.out | 10 ++++-- .../regress/sql/failure_add_disable_node.sql | 24 ++++++------- .../regress/sql/multi_mx_node_metadata.sql | 14 ++++++-- .../sql/replicated_table_disable_node.sql | 11 ++++-- 10 files changed, 98 insertions(+), 53 deletions(-) diff --git a/src/backend/distributed/metadata/node_metadata.c b/src/backend/distributed/metadata/node_metadata.c index b56f094b0..bd9fb55d9 100644 --- a/src/backend/distributed/metadata/node_metadata.c +++ b/src/backend/distributed/metadata/node_metadata.c @@ -496,16 +496,6 @@ citus_disable_node(PG_FUNCTION_ARGS) workerNode->workerName, nodePort))); } - - /* - * Delete replicated table placements from the coordinator's metadata, - * but not remotely. That is because one more more of the remote - * nodes might be down. Instead, we let the background worker - * to sync the metadata when possible. - */ - bool forceRemoteDelete = false; - DeleteAllReplicatedTablePlacementsFromNodeGroup(workerNode->groupId, - forceRemoteDelete); } TransactionModifiedNodeMetadata = true; @@ -515,6 +505,12 @@ citus_disable_node(PG_FUNCTION_ARGS) * active nodes get the metadata updates. We defer this operation to the * background worker to make it possible disabling nodes when multiple nodes * are down. + * + * Note that the active placements reside on the active nodes. Hence, when + * Citus finds active placements, it filters out the placements that are on + * the disabled nodes. That's why, we don't have to change/sync placement + * metadata at this point. Instead, we defer that to citus_activate_node() + * where we expect all nodes up and running. */ if (UnsetMetadataSyncedForAll()) { @@ -868,6 +864,22 @@ ActivateNode(char *nodeName, int nodePort) ereport(ERROR, (errmsg("node at \"%s:%u\" does not exist", nodeName, nodePort))); } + /* + * Delete existing reference and replicated table placements on the + * given groupId if the group has been disabled earlier (e.g., isActive + * set to false). + * + * Sync the metadata changes to all existing metadata nodes irrespective + * of the current nodes' metadata sync state. We expect all nodes up + * and running when another node is activated. + */ + if (!workerNode->isActive && NodeIsPrimary(workerNode)) + { + bool localOnly = false; + DeleteAllReplicatedTablePlacementsFromNodeGroup(workerNode->groupId, + localOnly); + } + workerNode = SetWorkerColumnLocalOnly(workerNode, Anum_pg_dist_node_isactive, BoolGetDatum(isActive)); @@ -1353,9 +1365,9 @@ RemoveNodeFromCluster(char *nodeName, int32 nodePort) * Delete reference table placements so they are not taken into account * for the check if there are placements after this. */ - bool forceRemoteDelete = true; + bool localOnly = false; DeleteAllReplicatedTablePlacementsFromNodeGroup(workerNode->groupId, - forceRemoteDelete); + localOnly); /* * Secondary nodes are read-only, never 2PC is used. diff --git a/src/backend/distributed/utils/reference_table_utils.c b/src/backend/distributed/utils/reference_table_utils.c index 93cdcc7fa..c20e38034 100644 --- a/src/backend/distributed/utils/reference_table_utils.c +++ b/src/backend/distributed/utils/reference_table_utils.c @@ -432,7 +432,7 @@ CreateReferenceTableColocationId() * all placements from pg_dist_placement table for given group. */ void -DeleteAllReplicatedTablePlacementsFromNodeGroup(int32 groupId, bool forceRemoteDelete) +DeleteAllReplicatedTablePlacementsFromNodeGroup(int32 groupId, bool localOnly) { List *referenceTableList = CitusTableTypeIdList(REFERENCE_TABLE); List *replicatedMetadataSyncedDistributedTableList = @@ -469,7 +469,7 @@ DeleteAllReplicatedTablePlacementsFromNodeGroup(int32 groupId, bool forceRemoteD DeleteShardPlacementRow(placement->placementId); - if (forceRemoteDelete) + if (!localOnly) { resetStringInfo(deletePlacementCommand); appendStringInfo(deletePlacementCommand, diff --git a/src/include/distributed/reference_table_utils.h b/src/include/distributed/reference_table_utils.h index 845f507a2..323f8e355 100644 --- a/src/include/distributed/reference_table_utils.h +++ b/src/include/distributed/reference_table_utils.h @@ -21,8 +21,8 @@ extern void EnsureReferenceTablesExistOnAllNodes(void); extern void EnsureReferenceTablesExistOnAllNodesExtended(char transferMode); extern uint32 CreateReferenceTableColocationId(void); -extern void DeleteAllReplicatedTablePlacementsFromNodeGroup(int32 groupId, bool - forceRemoteDelete); +extern void DeleteAllReplicatedTablePlacementsFromNodeGroup(int32 groupId, + bool localOnly); extern int CompareOids(const void *leftElement, const void *rightElement); extern int ReferenceTableReplicationFactor(void); extern void ReplicateAllReferenceTablesToNode(char *nodeName, int nodePort); diff --git a/src/test/regress/expected/failure_add_disable_node.out b/src/test/regress/expected/failure_add_disable_node.out index 72953f6a9..ef78e451b 100644 --- a/src/test/regress/expected/failure_add_disable_node.out +++ b/src/test/regress/expected/failure_add_disable_node.out @@ -74,8 +74,8 @@ ORDER BY 1, 2; (1 row) SELECT shardid, shardstate -FROM pg_dist_placement p JOIN pg_dist_shard s USING (shardid) -WHERE s.logicalrelid = 'user_table'::regclass +FROM pg_dist_placement p JOIN pg_dist_shard s USING (shardid) JOIN pg_dist_node n USING(groupid) +WHERE s.logicalrelid = 'user_table'::regclass AND n.isactive ORDER BY placementid; shardid | shardstate --------------------------------------------------------------------- @@ -91,8 +91,8 @@ ORDER BY 1, 2; (1 row) SELECT shardid, shardstate -FROM pg_dist_placement p JOIN pg_dist_shard s USING (shardid) -WHERE s.logicalrelid = 'user_table'::regclass +FROM pg_dist_placement p JOIN pg_dist_shard s USING (shardid) JOIN pg_dist_node n USING(groupid) +WHERE s.logicalrelid = 'user_table'::regclass AND n.isactive ORDER BY placementid; shardid | shardstate --------------------------------------------------------------------- @@ -120,8 +120,8 @@ ORDER BY 1, 2; (1 row) SELECT shardid, shardstate -FROM pg_dist_placement p JOIN pg_dist_shard s USING (shardid) -WHERE s.logicalrelid = 'user_table'::regclass +FROM pg_dist_placement p JOIN pg_dist_shard s USING (shardid) JOIN pg_dist_node n USING(groupid) +WHERE s.logicalrelid = 'user_table'::regclass AND n.isactive ORDER BY placementid; shardid | shardstate --------------------------------------------------------------------- @@ -157,8 +157,8 @@ ORDER BY 1, 2; (1 row) SELECT shardid, shardstate -FROM pg_dist_placement p JOIN pg_dist_shard s USING (shardid) -WHERE s.logicalrelid = 'user_table'::regclass +FROM pg_dist_placement p JOIN pg_dist_shard s USING (shardid) JOIN pg_dist_node n USING(groupid) +WHERE s.logicalrelid = 'user_table'::regclass AND n.isactive ORDER BY placementid; shardid | shardstate --------------------------------------------------------------------- @@ -181,8 +181,8 @@ SELECT master_remove_node('localhost', :worker_2_proxy_port); (1 row) SELECT shardid, shardstate -FROM pg_dist_placement p JOIN pg_dist_shard s USING (shardid) -WHERE s.logicalrelid = 'user_table'::regclass +FROM pg_dist_placement p JOIN pg_dist_shard s USING (shardid) JOIN pg_dist_node n USING(groupid) +WHERE s.logicalrelid = 'user_table'::regclass AND n.isactive ORDER BY placementid; shardid | shardstate --------------------------------------------------------------------- @@ -212,8 +212,8 @@ ORDER BY 1, 2; (2 rows) SELECT shardid, shardstate -FROM pg_dist_placement p JOIN pg_dist_shard s USING (shardid) -WHERE s.logicalrelid = 'user_table'::regclass +FROM pg_dist_placement p JOIN pg_dist_shard s USING (shardid) JOIN pg_dist_node n USING(groupid) +WHERE s.logicalrelid = 'user_table'::regclass AND n.isactive ORDER BY placementid; shardid | shardstate --------------------------------------------------------------------- diff --git a/src/test/regress/expected/multi_cluster_management.out b/src/test/regress/expected/multi_cluster_management.out index 78fb46b9e..b851d6909 100644 --- a/src/test/regress/expected/multi_cluster_management.out +++ b/src/test/regress/expected/multi_cluster_management.out @@ -510,6 +510,7 @@ ERROR: there is a shard placement in node group 6 but there are no nodes in tha SELECT * INTO old_placements FROM pg_dist_placement WHERE groupid = :worker_2_group; DELETE FROM pg_dist_placement WHERE groupid = :worker_2_group; SELECT master_add_node('localhost', :worker_2_port) AS new_node \gset +WARNING: could not find any shard placements for shardId 1220001 WARNING: citus.enable_object_propagation is off, not creating distributed objects on worker DETAIL: distributed objects are only kept in sync when citus.enable_object_propagation is set to on. Newly activated nodes will not get these objects created WARNING: could not find any shard placements for shardId 1220001 diff --git a/src/test/regress/expected/multi_mx_node_metadata.out b/src/test/regress/expected/multi_mx_node_metadata.out index b9fb854ee..d46f49caf 100644 --- a/src/test/regress/expected/multi_mx_node_metadata.out +++ b/src/test/regress/expected/multi_mx_node_metadata.out @@ -641,19 +641,32 @@ SELECT verify_metadata('localhost', :worker_1_port), --------------------------------------------------------------------- -- Don't drop the reference table so it has shards on the nodes being disabled DROP TABLE dist_table_1, dist_table_2; -SELECT pg_catalog.citus_disable_node_and_wait('localhost', :worker_2_port); +SELECT pg_catalog.citus_disable_node('localhost', :worker_2_port); NOTICE: Node localhost:xxxxx has active shard placements. Some queries may fail after this operation. Use SELECT citus_activate_node('localhost', 57638) to activate this node back. - citus_disable_node_and_wait + citus_disable_node --------------------------------------------------------------------- (1 row) -SELECT verify_metadata('localhost', :worker_1_port); - verify_metadata +SELECT wait_until_metadata_sync(30000); + wait_until_metadata_sync --------------------------------------------------------------------- - t + (1 row) +-- show that node metadata is the same +-- note that we cannot use verify_metadata here +-- because there are several shards/placements +-- in the metadata that are manually modified on the coordinator +-- not on the worker, and pg_catalog.citus_disable_node does +-- not sync the metadata +SELECT result FROM run_command_on_workers($$SELECT jsonb_agg(row_to_json(row(pg_dist_node.*))) FROM pg_dist_node$$) WHERE nodeport=:worker_1_port +EXCEPT +SELECT jsonb_agg(row_to_json(row(pg_dist_node.*)))::text FROM pg_dist_node; + result +--------------------------------------------------------------------- +(0 rows) + SELECT 1 FROM master_activate_node('localhost', :worker_2_port); ?column? --------------------------------------------------------------------- diff --git a/src/test/regress/expected/replicated_table_disable_node.out b/src/test/regress/expected/replicated_table_disable_node.out index 927589676..aa9de483f 100644 --- a/src/test/regress/expected/replicated_table_disable_node.out +++ b/src/test/regress/expected/replicated_table_disable_node.out @@ -32,16 +32,20 @@ SELECT public.wait_until_metadata_sync(); (1 row) --- the placement should be removed both from the coordinator +-- the active placement should NOT be removed from the coordinator -- and from the workers -SELECT count(*) FROM pg_dist_placement WHERE shardid IN (101500, 101501, 101502); +SELECT count(*) FROM pg_dist_placement p JOIN pg_dist_node n USING(groupid) + WHERE n.isactive AND n.noderole = 'primary' + AND p.shardid IN (101500, 101501, 101502); count --------------------------------------------------------------------- 3 (1 row) \c - - - :worker_1_port -SELECT count(*) FROM pg_dist_placement WHERE shardid IN (101500, 101501, 101502); +SELECT count(*) FROM pg_dist_placement p JOIN pg_dist_node n USING(groupid) + WHERE n.isactive AND n.noderole = 'primary' + AND p.shardid IN (101500, 101501, 101502); count --------------------------------------------------------------------- 3 diff --git a/src/test/regress/sql/failure_add_disable_node.sql b/src/test/regress/sql/failure_add_disable_node.sql index e97f1d242..1aa8f9526 100644 --- a/src/test/regress/sql/failure_add_disable_node.sql +++ b/src/test/regress/sql/failure_add_disable_node.sql @@ -37,8 +37,8 @@ SELECT * FROM master_get_active_worker_nodes() ORDER BY 1, 2; SELECT shardid, shardstate -FROM pg_dist_placement p JOIN pg_dist_shard s USING (shardid) -WHERE s.logicalrelid = 'user_table'::regclass +FROM pg_dist_placement p JOIN pg_dist_shard s USING (shardid) JOIN pg_dist_node n USING(groupid) +WHERE s.logicalrelid = 'user_table'::regclass AND n.isactive ORDER BY placementid; -- verify node is not activated @@ -46,8 +46,8 @@ SELECT * FROM master_get_active_worker_nodes() ORDER BY 1, 2; SELECT shardid, shardstate -FROM pg_dist_placement p JOIN pg_dist_shard s USING (shardid) -WHERE s.logicalrelid = 'user_table'::regclass +FROM pg_dist_placement p JOIN pg_dist_shard s USING (shardid) JOIN pg_dist_node n USING(groupid) +WHERE s.logicalrelid = 'user_table'::regclass AND n.isactive ORDER BY placementid; -- fail create schema command @@ -60,8 +60,8 @@ SELECT * FROM master_get_active_worker_nodes() ORDER BY 1, 2; SELECT shardid, shardstate -FROM pg_dist_placement p JOIN pg_dist_shard s USING (shardid) -WHERE s.logicalrelid = 'user_table'::regclass +FROM pg_dist_placement p JOIN pg_dist_shard s USING (shardid) JOIN pg_dist_node n USING(groupid) +WHERE s.logicalrelid = 'user_table'::regclass AND n.isactive ORDER BY placementid; BEGIN; @@ -81,8 +81,8 @@ SELECT * FROM master_get_active_worker_nodes() ORDER BY 1, 2; SELECT shardid, shardstate -FROM pg_dist_placement p JOIN pg_dist_shard s USING (shardid) -WHERE s.logicalrelid = 'user_table'::regclass +FROM pg_dist_placement p JOIN pg_dist_shard s USING (shardid) JOIN pg_dist_node n USING(groupid) +WHERE s.logicalrelid = 'user_table'::regclass AND n.isactive ORDER BY placementid; -- test master_add_inactive_node @@ -93,8 +93,8 @@ SELECT master_add_inactive_node('localhost', :worker_2_proxy_port); SELECT master_remove_node('localhost', :worker_2_proxy_port); SELECT shardid, shardstate -FROM pg_dist_placement p JOIN pg_dist_shard s USING (shardid) -WHERE s.logicalrelid = 'user_table'::regclass +FROM pg_dist_placement p JOIN pg_dist_shard s USING (shardid) JOIN pg_dist_node n USING(groupid) +WHERE s.logicalrelid = 'user_table'::regclass AND n.isactive ORDER BY placementid; -- reset cluster to original state @@ -106,8 +106,8 @@ SELECT * FROM master_get_active_worker_nodes() ORDER BY 1, 2; SELECT shardid, shardstate -FROM pg_dist_placement p JOIN pg_dist_shard s USING (shardid) -WHERE s.logicalrelid = 'user_table'::regclass +FROM pg_dist_placement p JOIN pg_dist_shard s USING (shardid) JOIN pg_dist_node n USING(groupid) +WHERE s.logicalrelid = 'user_table'::regclass AND n.isactive ORDER BY placementid; SELECT citus.mitmproxy('conn.allow()'); diff --git a/src/test/regress/sql/multi_mx_node_metadata.sql b/src/test/regress/sql/multi_mx_node_metadata.sql index 79558fc3a..09e8e49f1 100644 --- a/src/test/regress/sql/multi_mx_node_metadata.sql +++ b/src/test/regress/sql/multi_mx_node_metadata.sql @@ -286,8 +286,18 @@ SELECT verify_metadata('localhost', :worker_1_port), -- Don't drop the reference table so it has shards on the nodes being disabled DROP TABLE dist_table_1, dist_table_2; -SELECT pg_catalog.citus_disable_node_and_wait('localhost', :worker_2_port); -SELECT verify_metadata('localhost', :worker_1_port); +SELECT pg_catalog.citus_disable_node('localhost', :worker_2_port); +SELECT wait_until_metadata_sync(30000); + +-- show that node metadata is the same +-- note that we cannot use verify_metadata here +-- because there are several shards/placements +-- in the metadata that are manually modified on the coordinator +-- not on the worker, and pg_catalog.citus_disable_node does +-- not sync the metadata +SELECT result FROM run_command_on_workers($$SELECT jsonb_agg(row_to_json(row(pg_dist_node.*))) FROM pg_dist_node$$) WHERE nodeport=:worker_1_port +EXCEPT +SELECT jsonb_agg(row_to_json(row(pg_dist_node.*)))::text FROM pg_dist_node; SELECT 1 FROM master_activate_node('localhost', :worker_2_port); SELECT verify_metadata('localhost', :worker_1_port); diff --git a/src/test/regress/sql/replicated_table_disable_node.sql b/src/test/regress/sql/replicated_table_disable_node.sql index cff581f72..cf321d3c6 100644 --- a/src/test/regress/sql/replicated_table_disable_node.sql +++ b/src/test/regress/sql/replicated_table_disable_node.sql @@ -18,12 +18,17 @@ INSERT INTO ref SELECT i,i FROM generate_series(0,10)i; SELECT citus_disable_node('localhost', :worker_2_port, true); SELECT public.wait_until_metadata_sync(); --- the placement should be removed both from the coordinator +-- the active placement should NOT be removed from the coordinator -- and from the workers -SELECT count(*) FROM pg_dist_placement WHERE shardid IN (101500, 101501, 101502); +SELECT count(*) FROM pg_dist_placement p JOIN pg_dist_node n USING(groupid) + WHERE n.isactive AND n.noderole = 'primary' + AND p.shardid IN (101500, 101501, 101502); \c - - - :worker_1_port -SELECT count(*) FROM pg_dist_placement WHERE shardid IN (101500, 101501, 101502); +SELECT count(*) FROM pg_dist_placement p JOIN pg_dist_node n USING(groupid) + WHERE n.isactive AND n.noderole = 'primary' + AND p.shardid IN (101500, 101501, 101502); + SET search_path TO disable_node_with_replicated_tables; -- should be able to ingest data from both the worker and the coordinator