Fixes review comments

test_branch
gindibay 2023-08-17 12:05:58 +03:00
parent d9cecba67c
commit 48a5450d59
1 changed files with 89 additions and 70 deletions

View File

@ -9,7 +9,6 @@
#include "funcapi.h" #include "funcapi.h"
#include "utils/plancache.h" #include "utils/plancache.h"
#include "access/genam.h" #include "access/genam.h"
#include "access/heapam.h" #include "access/heapam.h"
#include "access/htup.h" #include "access/htup.h"
@ -102,8 +101,8 @@ static HeapTuple GetNodeByNodeId(int32 nodeId);
static int32 GetNextGroupId(void); static int32 GetNextGroupId(void);
static int GetNextNodeId(void); static int GetNextNodeId(void);
static void InsertPlaceholderCoordinatorRecord(void); static void InsertPlaceholderCoordinatorRecord(void);
static void InsertNodeRow(int nodeid, char *nodename, int32 nodeport, NodeMetadata static void InsertNodeRow(int nodeid, char *nodename, int32 nodeport,
*nodeMetadata); NodeMetadata *nodeMetadata);
static void DeleteNodeRow(char *nodename, int32 nodeport); static void DeleteNodeRow(char *nodename, int32 nodeport);
static void BlockDistributedQueriesOnMetadataNodes(void); static void BlockDistributedQueriesOnMetadataNodes(void);
static WorkerNode * TupleToWorkerNode(TupleDesc tupleDescriptor, HeapTuple heapTuple); static WorkerNode * TupleToWorkerNode(TupleDesc tupleDescriptor, HeapTuple heapTuple);
@ -163,7 +162,6 @@ PG_FUNCTION_INFO_V1(citus_coordinator_nodeid);
PG_FUNCTION_INFO_V1(citus_is_coordinator); PG_FUNCTION_INFO_V1(citus_is_coordinator);
PG_FUNCTION_INFO_V1(citus_internal_mark_node_not_synced); PG_FUNCTION_INFO_V1(citus_internal_mark_node_not_synced);
/* /*
* DefaultNodeMetadata creates a NodeMetadata struct with the fields set to * DefaultNodeMetadata creates a NodeMetadata struct with the fields set to
* sane defaults, e.g. nodeRack = WORKER_DEFAULT_RACK. * sane defaults, e.g. nodeRack = WORKER_DEFAULT_RACK.
@ -547,7 +545,8 @@ citus_disable_node(PG_FUNCTION_ARGS)
"metadata is not allowed"), "metadata is not allowed"),
errhint("You can force disabling node, SELECT " errhint("You can force disabling node, SELECT "
"citus_disable_node('%s', %d, " "citus_disable_node('%s', %d, "
"synchronous:=true);", workerNode->workerName, "synchronous:=true);",
workerNode->workerName,
nodePort), nodePort),
errdetail("Citus uses the first worker node in the " errdetail("Citus uses the first worker node in the "
"metadata for certain internal operations when " "metadata for certain internal operations when "
@ -696,8 +695,7 @@ citus_set_node_property(PG_FUNCTION_ARGS)
else else
{ {
ereport(ERROR, (errmsg( ereport(ERROR, (errmsg(
"only the 'shouldhaveshards' property can be set using this function" "only the 'shouldhaveshards' property can be set using this function")));
)));
} }
TransactionModifiedNodeMetadata = true; TransactionModifiedNodeMetadata = true;
@ -1178,6 +1176,81 @@ LockShardsInWorkerPlacementList(WorkerNode *workerNode, LOCKMODE lockMode)
} }
BackgroundWorkerHandle *
CheckBackgroundWorkerToObtainLocks(int32 lock_cooldown)
{
BackgroundWorkerHandle *handle = NULL;
handle = StartLockAcquireHelperBackgroundWorker(MyProcPid, lock_cooldown);
if (!handle)
{
/*
* We failed to start a background worker, which probably means that we exceeded
* max_worker_processes, and this is unlikely to be resolved by retrying. We do not want
* to repeatedly throw an error because if citus_update_node is called to complete a
* failover then finishing is the only way to bring the cluster back up. Therefore we
* give up on killing other backends and simply wait for the lock. We do set
* lock_timeout to lock_cooldown, because we don't want to wait forever to get a lock.
*/
SetLockTimeoutLocally(lock_cooldown);
ereport(WARNING, (errmsg(
"could not start background worker to kill backends with conflicting"
" locks to force the update. Degrading to acquiring locks "
"with a lock time out."),
errhint(
"Increasing max_worker_processes might help.")));
}
return handle;
}
/*
* This function is used to lock shards in a primary node.
* If force is true, we start a background worker to kill backends holding
* conflicting locks with this backend.
*
* If the node is a primary node we block reads and writes.
*
* This lock has two purposes:
*
* - Ensure buggy code in Citus doesn't cause failures when the
* nodename/nodeport of a node changes mid-query
*
* - Provide fencing during failover, after this function returns all
* connections will use the new node location.
*
* Drawback:
*
* - This function blocks until all previous queries have finished. This
* means that long-running queries will prevent failover.
*
* In case of node failure said long-running queries will fail in the end
* anyway as they will be unable to commit successfully on the failed
* machine. To cause quick failure of these queries use force => true
* during the invocation of citus_update_node to terminate conflicting
* backends proactively.
*
* It might be worth blocking reads to a secondary for the same reasons,
* though we currently only query secondaries on follower clusters
* where these locks will have no effect.
*/
BackgroundWorkerHandle *
LockPlacementsWithBackgroundWorkersInPrimaryNode(WorkerNode *workerNode, bool force, int32
lock_cooldown)
{
BackgroundWorkerHandle *handle = NULL;
if (NodeIsPrimary(workerNode))
{
if (force)
{
handle = CheckBackgroundWorkerToObtainLocks(lock_cooldown);
}
LockShardsInWorkerPlacementList(workerNode, AccessExclusiveLock);
}
return handle;
}
/* /*
* citus_update_node moves the requested node to a different nodename and nodeport. It * citus_update_node moves the requested node to a different nodename and nodeport. It
* locks to ensure no queries are running concurrently; and is intended for customers who * locks to ensure no queries are running concurrently; and is intended for customers who
@ -1243,63 +1316,8 @@ citus_update_node(PG_FUNCTION_ARGS)
EnsureTransactionalMetadataSyncMode(); EnsureTransactionalMetadataSyncMode();
} }
/* handle = LockPlacementsWithBackgroundWorkersInPrimaryNode(workerNode, force,
* If the node is a primary node we block reads and writes. lock_cooldown);
*
* This lock has two purposes:
*
* - Ensure buggy code in Citus doesn't cause failures when the
* nodename/nodeport of a node changes mid-query
*
* - Provide fencing during failover, after this function returns all
* connections will use the new node location.
*
* Drawback:
*
* - This function blocks until all previous queries have finished. This
* means that long-running queries will prevent failover.
*
* In case of node failure said long-running queries will fail in the end
* anyway as they will be unable to commit successfully on the failed
* machine. To cause quick failure of these queries use force => true
* during the invocation of citus_update_node to terminate conflicting
* backends proactively.
*
* It might be worth blocking reads to a secondary for the same reasons,
* though we currently only query secondaries on follower clusters
* where these locks will have no effect.
*/
if (NodeIsPrimary(workerNode))
{
/*
* before acquiring the locks check if we want a background worker to help us to
* aggressively obtain the locks.
*/
if (force)
{
handle = StartLockAcquireHelperBackgroundWorker(MyProcPid, lock_cooldown);
if (!handle)
{
/*
* We failed to start a background worker, which probably means that we exceeded
* max_worker_processes, and this is unlikely to be resolved by retrying. We do not want
* to repeatedly throw an error because if citus_update_node is called to complete a
* failover then finishing is the only way to bring the cluster back up. Therefore we
* give up on killing other backends and simply wait for the lock. We do set
* lock_timeout to lock_cooldown, because we don't want to wait forever to get a lock.
*/
SetLockTimeoutLocally(lock_cooldown);
ereport(WARNING, (errmsg(
"could not start background worker to kill backends with conflicting"
" locks to force the update. Degrading to acquiring locks "
"with a lock time out."),
errhint(
"Increasing max_worker_processes might help.")));
}
}
LockShardsInWorkerPlacementList(workerNode, AccessExclusiveLock);
}
/* /*
* if we have planned statements such as prepared statements, we should clear the cache so that * if we have planned statements such as prepared statements, we should clear the cache so that
@ -1358,6 +1376,8 @@ citus_pause_node_within_txn(PG_FUNCTION_ARGS)
CheckCitusVersion(ERROR); CheckCitusVersion(ERROR);
int32 nodeId = PG_GETARG_INT32(0); int32 nodeId = PG_GETARG_INT32(0);
bool force = PG_GETARG_BOOL(1);
int32 lock_cooldown = PG_GETARG_INT32(2);
WorkerNode *workerNode = FindNodeAnyClusterByNodeId(nodeId); WorkerNode *workerNode = FindNodeAnyClusterByNodeId(nodeId);
if (workerNode == NULL) if (workerNode == NULL)
@ -1366,10 +1386,7 @@ citus_pause_node_within_txn(PG_FUNCTION_ARGS)
errmsg("node %u not found", nodeId))); errmsg("node %u not found", nodeId)));
} }
if (NodeIsPrimary(workerNode)) LockPlacementsWithBackgroundWorkersInPrimaryNode(workerNode, force, lock_cooldown);
{
LockShardsInWorkerPlacementList(workerNode, AccessExclusiveLock);
}
PG_RETURN_VOID(); PG_RETURN_VOID();
} }
@ -1992,7 +2009,8 @@ ErrorIfNodeContainsNonRemovablePlacements(WorkerNode *workerNode)
ereport(ERROR, (errmsg("cannot remove or disable the node " ereport(ERROR, (errmsg("cannot remove or disable the node "
"%s:%d because because it contains " "%s:%d because because it contains "
"the only shard placement for " "the only shard placement for "
"shard " UINT64_FORMAT, workerNode->workerName, "shard " UINT64_FORMAT,
workerNode->workerName,
workerNode->workerPort, placement->shardId), workerNode->workerPort, placement->shardId),
errdetail("One of the table(s) that prevents the operation " errdetail("One of the table(s) that prevents the operation "
"complete successfully is %s", "complete successfully is %s",
@ -2544,7 +2562,8 @@ ErrorIfCoordinatorMetadataSetFalse(WorkerNode *workerNode, Datum value, char *fi
if (!valueBool && workerNode->groupId == COORDINATOR_GROUP_ID) if (!valueBool && workerNode->groupId == COORDINATOR_GROUP_ID)
{ {
ereport(ERROR, (errmsg("cannot change \"%s\" field of the " ereport(ERROR, (errmsg("cannot change \"%s\" field of the "
"coordinator node", field))); "coordinator node",
field)));
} }
} }