diff --git a/src/backend/distributed/metadata/node_metadata.c b/src/backend/distributed/metadata/node_metadata.c index b56f094b0..bd9fb55d9 100644 --- a/src/backend/distributed/metadata/node_metadata.c +++ b/src/backend/distributed/metadata/node_metadata.c @@ -496,16 +496,6 @@ citus_disable_node(PG_FUNCTION_ARGS) workerNode->workerName, nodePort))); } - - /* - * Delete replicated table placements from the coordinator's metadata, - * but not remotely. That is because one more more of the remote - * nodes might be down. Instead, we let the background worker - * to sync the metadata when possible. - */ - bool forceRemoteDelete = false; - DeleteAllReplicatedTablePlacementsFromNodeGroup(workerNode->groupId, - forceRemoteDelete); } TransactionModifiedNodeMetadata = true; @@ -515,6 +505,12 @@ citus_disable_node(PG_FUNCTION_ARGS) * active nodes get the metadata updates. We defer this operation to the * background worker to make it possible disabling nodes when multiple nodes * are down. + * + * Note that the active placements reside on the active nodes. Hence, when + * Citus finds active placements, it filters out the placements that are on + * the disabled nodes. That's why, we don't have to change/sync placement + * metadata at this point. Instead, we defer that to citus_activate_node() + * where we expect all nodes up and running. */ if (UnsetMetadataSyncedForAll()) { @@ -868,6 +864,22 @@ ActivateNode(char *nodeName, int nodePort) ereport(ERROR, (errmsg("node at \"%s:%u\" does not exist", nodeName, nodePort))); } + /* + * Delete existing reference and replicated table placements on the + * given groupId if the group has been disabled earlier (e.g., isActive + * set to false). + * + * Sync the metadata changes to all existing metadata nodes irrespective + * of the current nodes' metadata sync state. We expect all nodes up + * and running when another node is activated. + */ + if (!workerNode->isActive && NodeIsPrimary(workerNode)) + { + bool localOnly = false; + DeleteAllReplicatedTablePlacementsFromNodeGroup(workerNode->groupId, + localOnly); + } + workerNode = SetWorkerColumnLocalOnly(workerNode, Anum_pg_dist_node_isactive, BoolGetDatum(isActive)); @@ -1353,9 +1365,9 @@ RemoveNodeFromCluster(char *nodeName, int32 nodePort) * Delete reference table placements so they are not taken into account * for the check if there are placements after this. */ - bool forceRemoteDelete = true; + bool localOnly = false; DeleteAllReplicatedTablePlacementsFromNodeGroup(workerNode->groupId, - forceRemoteDelete); + localOnly); /* * Secondary nodes are read-only, never 2PC is used. diff --git a/src/backend/distributed/utils/reference_table_utils.c b/src/backend/distributed/utils/reference_table_utils.c index 93cdcc7fa..c20e38034 100644 --- a/src/backend/distributed/utils/reference_table_utils.c +++ b/src/backend/distributed/utils/reference_table_utils.c @@ -432,7 +432,7 @@ CreateReferenceTableColocationId() * all placements from pg_dist_placement table for given group. */ void -DeleteAllReplicatedTablePlacementsFromNodeGroup(int32 groupId, bool forceRemoteDelete) +DeleteAllReplicatedTablePlacementsFromNodeGroup(int32 groupId, bool localOnly) { List *referenceTableList = CitusTableTypeIdList(REFERENCE_TABLE); List *replicatedMetadataSyncedDistributedTableList = @@ -469,7 +469,7 @@ DeleteAllReplicatedTablePlacementsFromNodeGroup(int32 groupId, bool forceRemoteD DeleteShardPlacementRow(placement->placementId); - if (forceRemoteDelete) + if (!localOnly) { resetStringInfo(deletePlacementCommand); appendStringInfo(deletePlacementCommand, diff --git a/src/include/distributed/reference_table_utils.h b/src/include/distributed/reference_table_utils.h index 845f507a2..323f8e355 100644 --- a/src/include/distributed/reference_table_utils.h +++ b/src/include/distributed/reference_table_utils.h @@ -21,8 +21,8 @@ extern void EnsureReferenceTablesExistOnAllNodes(void); extern void EnsureReferenceTablesExistOnAllNodesExtended(char transferMode); extern uint32 CreateReferenceTableColocationId(void); -extern void DeleteAllReplicatedTablePlacementsFromNodeGroup(int32 groupId, bool - forceRemoteDelete); +extern void DeleteAllReplicatedTablePlacementsFromNodeGroup(int32 groupId, + bool localOnly); extern int CompareOids(const void *leftElement, const void *rightElement); extern int ReferenceTableReplicationFactor(void); extern void ReplicateAllReferenceTablesToNode(char *nodeName, int nodePort); diff --git a/src/test/regress/expected/failure_add_disable_node.out b/src/test/regress/expected/failure_add_disable_node.out index 72953f6a9..ef78e451b 100644 --- a/src/test/regress/expected/failure_add_disable_node.out +++ b/src/test/regress/expected/failure_add_disable_node.out @@ -74,8 +74,8 @@ ORDER BY 1, 2; (1 row) SELECT shardid, shardstate -FROM pg_dist_placement p JOIN pg_dist_shard s USING (shardid) -WHERE s.logicalrelid = 'user_table'::regclass +FROM pg_dist_placement p JOIN pg_dist_shard s USING (shardid) JOIN pg_dist_node n USING(groupid) +WHERE s.logicalrelid = 'user_table'::regclass AND n.isactive ORDER BY placementid; shardid | shardstate --------------------------------------------------------------------- @@ -91,8 +91,8 @@ ORDER BY 1, 2; (1 row) SELECT shardid, shardstate -FROM pg_dist_placement p JOIN pg_dist_shard s USING (shardid) -WHERE s.logicalrelid = 'user_table'::regclass +FROM pg_dist_placement p JOIN pg_dist_shard s USING (shardid) JOIN pg_dist_node n USING(groupid) +WHERE s.logicalrelid = 'user_table'::regclass AND n.isactive ORDER BY placementid; shardid | shardstate --------------------------------------------------------------------- @@ -120,8 +120,8 @@ ORDER BY 1, 2; (1 row) SELECT shardid, shardstate -FROM pg_dist_placement p JOIN pg_dist_shard s USING (shardid) -WHERE s.logicalrelid = 'user_table'::regclass +FROM pg_dist_placement p JOIN pg_dist_shard s USING (shardid) JOIN pg_dist_node n USING(groupid) +WHERE s.logicalrelid = 'user_table'::regclass AND n.isactive ORDER BY placementid; shardid | shardstate --------------------------------------------------------------------- @@ -157,8 +157,8 @@ ORDER BY 1, 2; (1 row) SELECT shardid, shardstate -FROM pg_dist_placement p JOIN pg_dist_shard s USING (shardid) -WHERE s.logicalrelid = 'user_table'::regclass +FROM pg_dist_placement p JOIN pg_dist_shard s USING (shardid) JOIN pg_dist_node n USING(groupid) +WHERE s.logicalrelid = 'user_table'::regclass AND n.isactive ORDER BY placementid; shardid | shardstate --------------------------------------------------------------------- @@ -181,8 +181,8 @@ SELECT master_remove_node('localhost', :worker_2_proxy_port); (1 row) SELECT shardid, shardstate -FROM pg_dist_placement p JOIN pg_dist_shard s USING (shardid) -WHERE s.logicalrelid = 'user_table'::regclass +FROM pg_dist_placement p JOIN pg_dist_shard s USING (shardid) JOIN pg_dist_node n USING(groupid) +WHERE s.logicalrelid = 'user_table'::regclass AND n.isactive ORDER BY placementid; shardid | shardstate --------------------------------------------------------------------- @@ -212,8 +212,8 @@ ORDER BY 1, 2; (2 rows) SELECT shardid, shardstate -FROM pg_dist_placement p JOIN pg_dist_shard s USING (shardid) -WHERE s.logicalrelid = 'user_table'::regclass +FROM pg_dist_placement p JOIN pg_dist_shard s USING (shardid) JOIN pg_dist_node n USING(groupid) +WHERE s.logicalrelid = 'user_table'::regclass AND n.isactive ORDER BY placementid; shardid | shardstate --------------------------------------------------------------------- diff --git a/src/test/regress/expected/multi_cluster_management.out b/src/test/regress/expected/multi_cluster_management.out index 78fb46b9e..b851d6909 100644 --- a/src/test/regress/expected/multi_cluster_management.out +++ b/src/test/regress/expected/multi_cluster_management.out @@ -510,6 +510,7 @@ ERROR: there is a shard placement in node group 6 but there are no nodes in tha SELECT * INTO old_placements FROM pg_dist_placement WHERE groupid = :worker_2_group; DELETE FROM pg_dist_placement WHERE groupid = :worker_2_group; SELECT master_add_node('localhost', :worker_2_port) AS new_node \gset +WARNING: could not find any shard placements for shardId 1220001 WARNING: citus.enable_object_propagation is off, not creating distributed objects on worker DETAIL: distributed objects are only kept in sync when citus.enable_object_propagation is set to on. Newly activated nodes will not get these objects created WARNING: could not find any shard placements for shardId 1220001 diff --git a/src/test/regress/expected/multi_mx_node_metadata.out b/src/test/regress/expected/multi_mx_node_metadata.out index b9fb854ee..d46f49caf 100644 --- a/src/test/regress/expected/multi_mx_node_metadata.out +++ b/src/test/regress/expected/multi_mx_node_metadata.out @@ -641,19 +641,32 @@ SELECT verify_metadata('localhost', :worker_1_port), --------------------------------------------------------------------- -- Don't drop the reference table so it has shards on the nodes being disabled DROP TABLE dist_table_1, dist_table_2; -SELECT pg_catalog.citus_disable_node_and_wait('localhost', :worker_2_port); +SELECT pg_catalog.citus_disable_node('localhost', :worker_2_port); NOTICE: Node localhost:xxxxx has active shard placements. Some queries may fail after this operation. Use SELECT citus_activate_node('localhost', 57638) to activate this node back. - citus_disable_node_and_wait + citus_disable_node --------------------------------------------------------------------- (1 row) -SELECT verify_metadata('localhost', :worker_1_port); - verify_metadata +SELECT wait_until_metadata_sync(30000); + wait_until_metadata_sync --------------------------------------------------------------------- - t + (1 row) +-- show that node metadata is the same +-- note that we cannot use verify_metadata here +-- because there are several shards/placements +-- in the metadata that are manually modified on the coordinator +-- not on the worker, and pg_catalog.citus_disable_node does +-- not sync the metadata +SELECT result FROM run_command_on_workers($$SELECT jsonb_agg(row_to_json(row(pg_dist_node.*))) FROM pg_dist_node$$) WHERE nodeport=:worker_1_port +EXCEPT +SELECT jsonb_agg(row_to_json(row(pg_dist_node.*)))::text FROM pg_dist_node; + result +--------------------------------------------------------------------- +(0 rows) + SELECT 1 FROM master_activate_node('localhost', :worker_2_port); ?column? --------------------------------------------------------------------- diff --git a/src/test/regress/expected/replicated_table_disable_node.out b/src/test/regress/expected/replicated_table_disable_node.out index 927589676..aa9de483f 100644 --- a/src/test/regress/expected/replicated_table_disable_node.out +++ b/src/test/regress/expected/replicated_table_disable_node.out @@ -32,16 +32,20 @@ SELECT public.wait_until_metadata_sync(); (1 row) --- the placement should be removed both from the coordinator +-- the active placement should NOT be removed from the coordinator -- and from the workers -SELECT count(*) FROM pg_dist_placement WHERE shardid IN (101500, 101501, 101502); +SELECT count(*) FROM pg_dist_placement p JOIN pg_dist_node n USING(groupid) + WHERE n.isactive AND n.noderole = 'primary' + AND p.shardid IN (101500, 101501, 101502); count --------------------------------------------------------------------- 3 (1 row) \c - - - :worker_1_port -SELECT count(*) FROM pg_dist_placement WHERE shardid IN (101500, 101501, 101502); +SELECT count(*) FROM pg_dist_placement p JOIN pg_dist_node n USING(groupid) + WHERE n.isactive AND n.noderole = 'primary' + AND p.shardid IN (101500, 101501, 101502); count --------------------------------------------------------------------- 3 diff --git a/src/test/regress/sql/failure_add_disable_node.sql b/src/test/regress/sql/failure_add_disable_node.sql index e97f1d242..1aa8f9526 100644 --- a/src/test/regress/sql/failure_add_disable_node.sql +++ b/src/test/regress/sql/failure_add_disable_node.sql @@ -37,8 +37,8 @@ SELECT * FROM master_get_active_worker_nodes() ORDER BY 1, 2; SELECT shardid, shardstate -FROM pg_dist_placement p JOIN pg_dist_shard s USING (shardid) -WHERE s.logicalrelid = 'user_table'::regclass +FROM pg_dist_placement p JOIN pg_dist_shard s USING (shardid) JOIN pg_dist_node n USING(groupid) +WHERE s.logicalrelid = 'user_table'::regclass AND n.isactive ORDER BY placementid; -- verify node is not activated @@ -46,8 +46,8 @@ SELECT * FROM master_get_active_worker_nodes() ORDER BY 1, 2; SELECT shardid, shardstate -FROM pg_dist_placement p JOIN pg_dist_shard s USING (shardid) -WHERE s.logicalrelid = 'user_table'::regclass +FROM pg_dist_placement p JOIN pg_dist_shard s USING (shardid) JOIN pg_dist_node n USING(groupid) +WHERE s.logicalrelid = 'user_table'::regclass AND n.isactive ORDER BY placementid; -- fail create schema command @@ -60,8 +60,8 @@ SELECT * FROM master_get_active_worker_nodes() ORDER BY 1, 2; SELECT shardid, shardstate -FROM pg_dist_placement p JOIN pg_dist_shard s USING (shardid) -WHERE s.logicalrelid = 'user_table'::regclass +FROM pg_dist_placement p JOIN pg_dist_shard s USING (shardid) JOIN pg_dist_node n USING(groupid) +WHERE s.logicalrelid = 'user_table'::regclass AND n.isactive ORDER BY placementid; BEGIN; @@ -81,8 +81,8 @@ SELECT * FROM master_get_active_worker_nodes() ORDER BY 1, 2; SELECT shardid, shardstate -FROM pg_dist_placement p JOIN pg_dist_shard s USING (shardid) -WHERE s.logicalrelid = 'user_table'::regclass +FROM pg_dist_placement p JOIN pg_dist_shard s USING (shardid) JOIN pg_dist_node n USING(groupid) +WHERE s.logicalrelid = 'user_table'::regclass AND n.isactive ORDER BY placementid; -- test master_add_inactive_node @@ -93,8 +93,8 @@ SELECT master_add_inactive_node('localhost', :worker_2_proxy_port); SELECT master_remove_node('localhost', :worker_2_proxy_port); SELECT shardid, shardstate -FROM pg_dist_placement p JOIN pg_dist_shard s USING (shardid) -WHERE s.logicalrelid = 'user_table'::regclass +FROM pg_dist_placement p JOIN pg_dist_shard s USING (shardid) JOIN pg_dist_node n USING(groupid) +WHERE s.logicalrelid = 'user_table'::regclass AND n.isactive ORDER BY placementid; -- reset cluster to original state @@ -106,8 +106,8 @@ SELECT * FROM master_get_active_worker_nodes() ORDER BY 1, 2; SELECT shardid, shardstate -FROM pg_dist_placement p JOIN pg_dist_shard s USING (shardid) -WHERE s.logicalrelid = 'user_table'::regclass +FROM pg_dist_placement p JOIN pg_dist_shard s USING (shardid) JOIN pg_dist_node n USING(groupid) +WHERE s.logicalrelid = 'user_table'::regclass AND n.isactive ORDER BY placementid; SELECT citus.mitmproxy('conn.allow()'); diff --git a/src/test/regress/sql/multi_mx_node_metadata.sql b/src/test/regress/sql/multi_mx_node_metadata.sql index 79558fc3a..09e8e49f1 100644 --- a/src/test/regress/sql/multi_mx_node_metadata.sql +++ b/src/test/regress/sql/multi_mx_node_metadata.sql @@ -286,8 +286,18 @@ SELECT verify_metadata('localhost', :worker_1_port), -- Don't drop the reference table so it has shards on the nodes being disabled DROP TABLE dist_table_1, dist_table_2; -SELECT pg_catalog.citus_disable_node_and_wait('localhost', :worker_2_port); -SELECT verify_metadata('localhost', :worker_1_port); +SELECT pg_catalog.citus_disable_node('localhost', :worker_2_port); +SELECT wait_until_metadata_sync(30000); + +-- show that node metadata is the same +-- note that we cannot use verify_metadata here +-- because there are several shards/placements +-- in the metadata that are manually modified on the coordinator +-- not on the worker, and pg_catalog.citus_disable_node does +-- not sync the metadata +SELECT result FROM run_command_on_workers($$SELECT jsonb_agg(row_to_json(row(pg_dist_node.*))) FROM pg_dist_node$$) WHERE nodeport=:worker_1_port +EXCEPT +SELECT jsonb_agg(row_to_json(row(pg_dist_node.*)))::text FROM pg_dist_node; SELECT 1 FROM master_activate_node('localhost', :worker_2_port); SELECT verify_metadata('localhost', :worker_1_port); diff --git a/src/test/regress/sql/replicated_table_disable_node.sql b/src/test/regress/sql/replicated_table_disable_node.sql index cff581f72..cf321d3c6 100644 --- a/src/test/regress/sql/replicated_table_disable_node.sql +++ b/src/test/regress/sql/replicated_table_disable_node.sql @@ -18,12 +18,17 @@ INSERT INTO ref SELECT i,i FROM generate_series(0,10)i; SELECT citus_disable_node('localhost', :worker_2_port, true); SELECT public.wait_until_metadata_sync(); --- the placement should be removed both from the coordinator +-- the active placement should NOT be removed from the coordinator -- and from the workers -SELECT count(*) FROM pg_dist_placement WHERE shardid IN (101500, 101501, 101502); +SELECT count(*) FROM pg_dist_placement p JOIN pg_dist_node n USING(groupid) + WHERE n.isactive AND n.noderole = 'primary' + AND p.shardid IN (101500, 101501, 101502); \c - - - :worker_1_port -SELECT count(*) FROM pg_dist_placement WHERE shardid IN (101500, 101501, 101502); +SELECT count(*) FROM pg_dist_placement p JOIN pg_dist_node n USING(groupid) + WHERE n.isactive AND n.noderole = 'primary' + AND p.shardid IN (101500, 101501, 101502); + SET search_path TO disable_node_with_replicated_tables; -- should be able to ingest data from both the worker and the coordinator