From 339a47a18a791f621399018e33e3e2ea5bf6af31 Mon Sep 17 00:00:00 2001 From: gindibay Date: Sat, 29 Jul 2023 20:01:27 +0300 Subject: [PATCH] Fixes review comments --- .../distributed/metadata/node_metadata.c | 21 ++++++++++++------- .../expected/upgrade_list_citus_objects.out | 2 +- src/test/regress/isolation_schedule | 1 - 3 files changed, 15 insertions(+), 9 deletions(-) diff --git a/src/backend/distributed/metadata/node_metadata.c b/src/backend/distributed/metadata/node_metadata.c index 44e80404c..aac0f0155 100644 --- a/src/backend/distributed/metadata/node_metadata.c +++ b/src/backend/distributed/metadata/node_metadata.c @@ -134,6 +134,7 @@ static void MarkNodesNotSyncedInLoopBackConnection(MetadataSyncContext *context, static void EnsureParentSessionHasExclusiveLockOnPgDistNode(pid_t parentSessionPid); static void SetNodeMetadata(MetadataSyncContext *context, bool localOnly); static void EnsureTransactionalMetadataSyncMode(void); +static void lock_shards_in_worker_placement_list(WorkerNode *workerNode, LOCKMODE lockMode); /* declarations for dynamic loading */ PG_FUNCTION_INFO_V1(citus_set_coordinator_host); @@ -1160,6 +1161,15 @@ ActivateNodeList(MetadataSyncContext *context) SetNodeMetadata(context, localOnly); } +/* +* Adds locks into all shards placed into given workerNode. +*/ +void lock_shards_in_worker_placement_list(WorkerNode *workerNode , LOCKMODE lockMode){ + List *placementList = NIL; + + placementList = AllShardPlacementsOnNodeGroup(workerNode->groupId); + LockShardsInPlacementListMetadata(placementList, lockMode); +} /* * citus_update_node moves the requested node to a different nodename and nodeport. It @@ -1189,7 +1199,6 @@ citus_update_node(PG_FUNCTION_ARGS) int32 lock_cooldown = PG_GETARG_INT32(4); char *newNodeNameString = text_to_cstring(newNodeName); - List *placementList = NIL; BackgroundWorkerHandle *handle = NULL; WorkerNode *workerNodeWithSameAddress = FindWorkerNodeAnyCluster(newNodeNameString, @@ -1282,8 +1291,7 @@ citus_update_node(PG_FUNCTION_ARGS) } } - placementList = AllShardPlacementsOnNodeGroup(workerNode->groupId); - LockShardsInPlacementListMetadata(placementList, AccessExclusiveLock); + lock_shards_in_worker_placement_list(workerNode, AccessExclusiveLock); } /* @@ -1331,13 +1339,14 @@ citus_update_node(PG_FUNCTION_ARGS) } + + Datum citus_pause_node_within_txn(PG_FUNCTION_ARGS) { CheckCitusVersion(ERROR); int32 nodeId = PG_GETARG_INT32(0); - List *placementList = NIL; WorkerNode *workerNode = FindNodeAnyClusterByNodeId(nodeId); if (workerNode == NULL) @@ -1346,11 +1355,9 @@ citus_pause_node_within_txn(PG_FUNCTION_ARGS) errmsg("node %u not found", nodeId))); } - if (NodeIsPrimary(workerNode)) { - placementList = AllShardPlacementsOnNodeGroup(workerNode->groupId); - LockShardsInPlacementListMetadata(placementList, AccessExclusiveLock); + lock_shards_in_worker_placement_list(workerNode, AccessExclusiveLock); } PG_RETURN_VOID(); diff --git a/src/test/regress/expected/upgrade_list_citus_objects.out b/src/test/regress/expected/upgrade_list_citus_objects.out index efc93f651..b94a120bc 100644 --- a/src/test/regress/expected/upgrade_list_citus_objects.out +++ b/src/test/regress/expected/upgrade_list_citus_objects.out @@ -103,7 +103,7 @@ ORDER BY 1; function citus_nodeid_for_gpid(bigint) function citus_nodename_for_nodeid(integer) function citus_nodeport_for_nodeid(integer) - function citus_pause_node(integer) + function citus_pause_node_within_txn(integer) function citus_pid_for_gpid(bigint) function citus_prepare_pg_upgrade() function citus_query_stats() diff --git a/src/test/regress/isolation_schedule b/src/test/regress/isolation_schedule index 5fe9af7bc..d8cc77c73 100644 --- a/src/test/regress/isolation_schedule +++ b/src/test/regress/isolation_schedule @@ -80,7 +80,6 @@ test: isolation_schema_based_sharding test: isolation_citus_pause_node test: isolation_citus_schema_distribute_undistribute - # Rebalancer test: isolation_blocking_move_single_shard_commands test: isolation_blocking_move_multi_shard_commands