Fixes review comments

test_branch
gindibay 2023-07-29 20:01:27 +03:00
parent b471bb04d2
commit 339a47a18a
3 changed files with 15 additions and 9 deletions

View File

@ -134,6 +134,7 @@ static void MarkNodesNotSyncedInLoopBackConnection(MetadataSyncContext *context,
static void EnsureParentSessionHasExclusiveLockOnPgDistNode(pid_t parentSessionPid);
static void SetNodeMetadata(MetadataSyncContext *context, bool localOnly);
static void EnsureTransactionalMetadataSyncMode(void);
static void lock_shards_in_worker_placement_list(WorkerNode *workerNode, LOCKMODE lockMode);
/* declarations for dynamic loading */
PG_FUNCTION_INFO_V1(citus_set_coordinator_host);
@ -1160,6 +1161,15 @@ ActivateNodeList(MetadataSyncContext *context)
SetNodeMetadata(context, localOnly);
}
/*
* Adds locks into all shards placed into given workerNode.
*/
void lock_shards_in_worker_placement_list(WorkerNode *workerNode , LOCKMODE lockMode){
List *placementList = NIL;
placementList = AllShardPlacementsOnNodeGroup(workerNode->groupId);
LockShardsInPlacementListMetadata(placementList, lockMode);
}
/*
* citus_update_node moves the requested node to a different nodename and nodeport. It
@ -1189,7 +1199,6 @@ citus_update_node(PG_FUNCTION_ARGS)
int32 lock_cooldown = PG_GETARG_INT32(4);
char *newNodeNameString = text_to_cstring(newNodeName);
List *placementList = NIL;
BackgroundWorkerHandle *handle = NULL;
WorkerNode *workerNodeWithSameAddress = FindWorkerNodeAnyCluster(newNodeNameString,
@ -1282,8 +1291,7 @@ citus_update_node(PG_FUNCTION_ARGS)
}
}
placementList = AllShardPlacementsOnNodeGroup(workerNode->groupId);
LockShardsInPlacementListMetadata(placementList, AccessExclusiveLock);
lock_shards_in_worker_placement_list(workerNode, AccessExclusiveLock);
}
/*
@ -1331,13 +1339,14 @@ citus_update_node(PG_FUNCTION_ARGS)
}
Datum
citus_pause_node_within_txn(PG_FUNCTION_ARGS)
{
CheckCitusVersion(ERROR);
int32 nodeId = PG_GETARG_INT32(0);
List *placementList = NIL;
WorkerNode *workerNode = FindNodeAnyClusterByNodeId(nodeId);
if (workerNode == NULL)
@ -1346,11 +1355,9 @@ citus_pause_node_within_txn(PG_FUNCTION_ARGS)
errmsg("node %u not found", nodeId)));
}
if (NodeIsPrimary(workerNode))
{
placementList = AllShardPlacementsOnNodeGroup(workerNode->groupId);
LockShardsInPlacementListMetadata(placementList, AccessExclusiveLock);
lock_shards_in_worker_placement_list(workerNode, AccessExclusiveLock);
}
PG_RETURN_VOID();

View File

@ -103,7 +103,7 @@ ORDER BY 1;
function citus_nodeid_for_gpid(bigint)
function citus_nodename_for_nodeid(integer)
function citus_nodeport_for_nodeid(integer)
function citus_pause_node(integer)
function citus_pause_node_within_txn(integer)
function citus_pid_for_gpid(bigint)
function citus_prepare_pg_upgrade()
function citus_query_stats()

View File

@ -80,7 +80,6 @@ test: isolation_schema_based_sharding
test: isolation_citus_pause_node
test: isolation_citus_schema_distribute_undistribute
# Rebalancer
test: isolation_blocking_move_single_shard_commands
test: isolation_blocking_move_multi_shard_commands