From 48a5450d595a00be4734956522cee305a9cd9587 Mon Sep 17 00:00:00 2001 From: gindibay Date: Thu, 17 Aug 2023 12:05:58 +0300 Subject: [PATCH] Fixes review comments --- .../distributed/metadata/node_metadata.c | 159 ++++++++++-------- 1 file changed, 89 insertions(+), 70 deletions(-) diff --git a/src/backend/distributed/metadata/node_metadata.c b/src/backend/distributed/metadata/node_metadata.c index 3a8df5ca9..7a1ef237a 100644 --- a/src/backend/distributed/metadata/node_metadata.c +++ b/src/backend/distributed/metadata/node_metadata.c @@ -9,7 +9,6 @@ #include "funcapi.h" #include "utils/plancache.h" - #include "access/genam.h" #include "access/heapam.h" #include "access/htup.h" @@ -102,8 +101,8 @@ static HeapTuple GetNodeByNodeId(int32 nodeId); static int32 GetNextGroupId(void); static int GetNextNodeId(void); static void InsertPlaceholderCoordinatorRecord(void); -static void InsertNodeRow(int nodeid, char *nodename, int32 nodeport, NodeMetadata - *nodeMetadata); +static void InsertNodeRow(int nodeid, char *nodename, int32 nodeport, + NodeMetadata *nodeMetadata); static void DeleteNodeRow(char *nodename, int32 nodeport); static void BlockDistributedQueriesOnMetadataNodes(void); static WorkerNode * TupleToWorkerNode(TupleDesc tupleDescriptor, HeapTuple heapTuple); @@ -163,7 +162,6 @@ PG_FUNCTION_INFO_V1(citus_coordinator_nodeid); PG_FUNCTION_INFO_V1(citus_is_coordinator); PG_FUNCTION_INFO_V1(citus_internal_mark_node_not_synced); - /* * DefaultNodeMetadata creates a NodeMetadata struct with the fields set to * sane defaults, e.g. nodeRack = WORKER_DEFAULT_RACK. @@ -547,7 +545,8 @@ citus_disable_node(PG_FUNCTION_ARGS) "metadata is not allowed"), errhint("You can force disabling node, SELECT " "citus_disable_node('%s', %d, " - "synchronous:=true);", workerNode->workerName, + "synchronous:=true);", + workerNode->workerName, nodePort), errdetail("Citus uses the first worker node in the " "metadata for certain internal operations when " @@ -696,8 +695,7 @@ citus_set_node_property(PG_FUNCTION_ARGS) else { ereport(ERROR, (errmsg( - "only the 'shouldhaveshards' property can be set using this function" - ))); + "only the 'shouldhaveshards' property can be set using this function"))); } TransactionModifiedNodeMetadata = true; @@ -1178,6 +1176,81 @@ LockShardsInWorkerPlacementList(WorkerNode *workerNode, LOCKMODE lockMode) } +BackgroundWorkerHandle * +CheckBackgroundWorkerToObtainLocks(int32 lock_cooldown) +{ + BackgroundWorkerHandle *handle = NULL; + handle = StartLockAcquireHelperBackgroundWorker(MyProcPid, lock_cooldown); + if (!handle) + { + /* + * We failed to start a background worker, which probably means that we exceeded + * max_worker_processes, and this is unlikely to be resolved by retrying. We do not want + * to repeatedly throw an error because if citus_update_node is called to complete a + * failover then finishing is the only way to bring the cluster back up. Therefore we + * give up on killing other backends and simply wait for the lock. We do set + * lock_timeout to lock_cooldown, because we don't want to wait forever to get a lock. + */ + SetLockTimeoutLocally(lock_cooldown); + ereport(WARNING, (errmsg( + "could not start background worker to kill backends with conflicting" + " locks to force the update. Degrading to acquiring locks " + "with a lock time out."), + errhint( + "Increasing max_worker_processes might help."))); + } + return handle; +} + + +/* + * This function is used to lock shards in a primary node. + * If force is true, we start a background worker to kill backends holding + * conflicting locks with this backend. + * + * If the node is a primary node we block reads and writes. + * + * This lock has two purposes: + * + * - Ensure buggy code in Citus doesn't cause failures when the + * nodename/nodeport of a node changes mid-query + * + * - Provide fencing during failover, after this function returns all + * connections will use the new node location. + * + * Drawback: + * + * - This function blocks until all previous queries have finished. This + * means that long-running queries will prevent failover. + * + * In case of node failure said long-running queries will fail in the end + * anyway as they will be unable to commit successfully on the failed + * machine. To cause quick failure of these queries use force => true + * during the invocation of citus_update_node to terminate conflicting + * backends proactively. + * + * It might be worth blocking reads to a secondary for the same reasons, + * though we currently only query secondaries on follower clusters + * where these locks will have no effect. + */ +BackgroundWorkerHandle * +LockPlacementsWithBackgroundWorkersInPrimaryNode(WorkerNode *workerNode, bool force, int32 + lock_cooldown) +{ + BackgroundWorkerHandle *handle = NULL; + + if (NodeIsPrimary(workerNode)) + { + if (force) + { + handle = CheckBackgroundWorkerToObtainLocks(lock_cooldown); + } + LockShardsInWorkerPlacementList(workerNode, AccessExclusiveLock); + } + return handle; +} + + /* * citus_update_node moves the requested node to a different nodename and nodeport. It * locks to ensure no queries are running concurrently; and is intended for customers who @@ -1243,63 +1316,8 @@ citus_update_node(PG_FUNCTION_ARGS) EnsureTransactionalMetadataSyncMode(); } - /* - * If the node is a primary node we block reads and writes. - * - * This lock has two purposes: - * - * - Ensure buggy code in Citus doesn't cause failures when the - * nodename/nodeport of a node changes mid-query - * - * - Provide fencing during failover, after this function returns all - * connections will use the new node location. - * - * Drawback: - * - * - This function blocks until all previous queries have finished. This - * means that long-running queries will prevent failover. - * - * In case of node failure said long-running queries will fail in the end - * anyway as they will be unable to commit successfully on the failed - * machine. To cause quick failure of these queries use force => true - * during the invocation of citus_update_node to terminate conflicting - * backends proactively. - * - * It might be worth blocking reads to a secondary for the same reasons, - * though we currently only query secondaries on follower clusters - * where these locks will have no effect. - */ - if (NodeIsPrimary(workerNode)) - { - /* - * before acquiring the locks check if we want a background worker to help us to - * aggressively obtain the locks. - */ - if (force) - { - handle = StartLockAcquireHelperBackgroundWorker(MyProcPid, lock_cooldown); - if (!handle) - { - /* - * We failed to start a background worker, which probably means that we exceeded - * max_worker_processes, and this is unlikely to be resolved by retrying. We do not want - * to repeatedly throw an error because if citus_update_node is called to complete a - * failover then finishing is the only way to bring the cluster back up. Therefore we - * give up on killing other backends and simply wait for the lock. We do set - * lock_timeout to lock_cooldown, because we don't want to wait forever to get a lock. - */ - SetLockTimeoutLocally(lock_cooldown); - ereport(WARNING, (errmsg( - "could not start background worker to kill backends with conflicting" - " locks to force the update. Degrading to acquiring locks " - "with a lock time out."), - errhint( - "Increasing max_worker_processes might help."))); - } - } - - LockShardsInWorkerPlacementList(workerNode, AccessExclusiveLock); - } + handle = LockPlacementsWithBackgroundWorkersInPrimaryNode(workerNode, force, + lock_cooldown); /* * if we have planned statements such as prepared statements, we should clear the cache so that @@ -1358,6 +1376,8 @@ citus_pause_node_within_txn(PG_FUNCTION_ARGS) CheckCitusVersion(ERROR); int32 nodeId = PG_GETARG_INT32(0); + bool force = PG_GETARG_BOOL(1); + int32 lock_cooldown = PG_GETARG_INT32(2); WorkerNode *workerNode = FindNodeAnyClusterByNodeId(nodeId); if (workerNode == NULL) @@ -1366,10 +1386,7 @@ citus_pause_node_within_txn(PG_FUNCTION_ARGS) errmsg("node %u not found", nodeId))); } - if (NodeIsPrimary(workerNode)) - { - LockShardsInWorkerPlacementList(workerNode, AccessExclusiveLock); - } + LockPlacementsWithBackgroundWorkersInPrimaryNode(workerNode, force, lock_cooldown); PG_RETURN_VOID(); } @@ -1992,7 +2009,8 @@ ErrorIfNodeContainsNonRemovablePlacements(WorkerNode *workerNode) ereport(ERROR, (errmsg("cannot remove or disable the node " "%s:%d because because it contains " "the only shard placement for " - "shard " UINT64_FORMAT, workerNode->workerName, + "shard " UINT64_FORMAT, + workerNode->workerName, workerNode->workerPort, placement->shardId), errdetail("One of the table(s) that prevents the operation " "complete successfully is %s", @@ -2544,7 +2562,8 @@ ErrorIfCoordinatorMetadataSetFalse(WorkerNode *workerNode, Datum value, char *fi if (!valueBool && workerNode->groupId == COORDINATOR_GROUP_ID) { ereport(ERROR, (errmsg("cannot change \"%s\" field of the " - "coordinator node", field))); + "coordinator node", + field))); } }