refactor around local group id checks

Mostyl optimizes the calls made to GetLocalGroupId and refactors
its usages
pull/3563/head
Onur Tirtir 2020-03-03 10:45:02 +03:00
parent 1e128a6ee4
commit 88bfd2e4b7
17 changed files with 44 additions and 25 deletions

View File

@ -31,7 +31,7 @@
#include "utils/ruleutils.h"
#include "utils/syscache.h"
/* Local functions forward declarations */
static bool HeapTupleOfForeignConstraintIncludesColumn(HeapTuple heapTuple, Oid
relationId, int pgConstraintKey,
char *columnName);
@ -173,6 +173,7 @@ ErrorIfUnsupportedForeignConstraintExists(Relation relation, char referencingDis
" or a reference table.")));
}
/* set referenced table related variables here in an optimized way */
if (!selfReferencingTable)
{
referencedDistMethod = PartitionMethod(referencedTableId);

View File

@ -234,6 +234,8 @@ AcquireDistributedLockOnRelations(List *relationIdList, LOCKMODE lockMode)
UseCoordinatedTransaction();
int32 localGroupId = GetLocalGroupId();
foreach_oid(relationId, relationIdList)
{
/*
@ -256,7 +258,7 @@ AcquireDistributedLockOnRelations(List *relationIdList, LOCKMODE lockMode)
int nodePort = workerNode->workerPort;
/* if local node is one of the targets, acquire the lock locally */
if (workerNode->groupId == GetLocalGroupId())
if (workerNode->groupId == localGroupId)
{
LockRelationOid(relationId, lockMode);
continue;

View File

@ -1609,6 +1609,8 @@ AssignTasksToConnectionsOrWorkerPool(DistributedExecution *execution)
List *taskList = execution->tasksToExecute;
bool hasReturning = execution->hasReturning;
int32 localGroupId = GetLocalGroupId();
Task *task = NULL;
foreach_ptr(task, taskList)
{
@ -1752,7 +1754,7 @@ AssignTasksToConnectionsOrWorkerPool(DistributedExecution *execution)
}
if (!TransactionConnectedToLocalGroup && taskPlacement->groupId ==
GetLocalGroupId())
localGroupId)
{
TransactionConnectedToLocalGroup = true;
}

View File

@ -532,10 +532,13 @@ GetCachedLocalPlan(Task *task, DistributedPlan *distributedPlan)
{
List *cachedPlanList = distributedPlan->workerJob->localPlannedStatements;
LocalPlannedStatement *localPlannedStatement = NULL;
int32 localGroupId = GetLocalGroupId();
foreach_ptr(localPlannedStatement, cachedPlanList)
{
if (localPlannedStatement->shardId == task->anchorShardId &&
localPlannedStatement->localGroupId == GetLocalGroupId())
localPlannedStatement->localGroupId == localGroupId)
{
/* already have a cached plan, no need to continue */
return localPlannedStatement->localPlan;

View File

@ -504,7 +504,7 @@ ShouldExecuteTasksLocally(List *taskList)
bool
TaskAccessesLocalNode(Task *task)
{
int localGroupId = GetLocalGroupId();
int32 localGroupId = GetLocalGroupId();
ShardPlacement *taskPlacement = NULL;
foreach_ptr(taskPlacement, task->taskPlacementList)

View File

@ -772,7 +772,7 @@ ShardStorageType(Oid relationId)
bool
IsCoordinator(void)
{
return (GetLocalGroupId() == 0);
return (GetLocalGroupId() == COORDINATOR_GROUP_ID);
}

View File

@ -199,7 +199,7 @@ ClusterHasKnownMetadataWorkers()
{
bool workerWithMetadata = false;
if (GetLocalGroupId() != 0)
if (!IsCoordinator())
{
workerWithMetadata = true;
}

View File

@ -1334,7 +1334,7 @@ GetNextNodeId()
void
EnsureCoordinator(void)
{
int localGroupId = GetLocalGroupId();
int32 localGroupId = GetLocalGroupId();
if (localGroupId != 0)
{

View File

@ -123,8 +123,8 @@ TryToDelegateFunctionCall(DistributedPlanningContext *planContext)
return NULL;
}
int32 groupId = GetLocalGroupId();
if (groupId != 0 || groupId == GROUP_ID_UPGRADING)
int32 localGroupId = GetLocalGroupId();
if (localGroupId != COORDINATOR_GROUP_ID || localGroupId == GROUP_ID_UPGRADING)
{
/* do not delegate from workers, or while upgrading */
return NULL;

View File

@ -226,6 +226,8 @@ get_global_active_transactions(PG_FUNCTION_ARGS)
/* add active transactions for local node */
StoreAllActiveTransactions(tupleStore, tupleDescriptor);
int32 localGroupId = GetLocalGroupId();
/* open connections in parallel */
WorkerNode *workerNode = NULL;
foreach_ptr(workerNode, workerNodeList)
@ -234,7 +236,7 @@ get_global_active_transactions(PG_FUNCTION_ARGS)
int nodePort = workerNode->workerPort;
int connectionFlags = 0;
if (workerNode->groupId == GetLocalGroupId())
if (workerNode->groupId == localGroupId)
{
/* we already get these transactions via GetAllActiveTransactions() */
continue;
@ -726,7 +728,7 @@ AssignDistributedTransactionId(void)
&backendManagementShmemData->nextTransactionNumber;
uint64 nextTransactionNumber = pg_atomic_fetch_add_u64(transactionNumberSequence, 1);
int localGroupId = GetLocalGroupId();
int32 localGroupId = GetLocalGroupId();
TimestampTz currentTimestamp = GetCurrentTimestamp();
Oid userId = GetUserId();
@ -758,7 +760,7 @@ MarkCitusInitiatedCoordinatorBackend(void)
* GetLocalGroupId may throw exception which can cause leaving spin lock
* unreleased. Calling GetLocalGroupId function before the lock to avoid this.
*/
int localGroupId = GetLocalGroupId();
int32 localGroupId = GetLocalGroupId();
SpinLockAcquire(&MyBackendData->mutex);

View File

@ -329,6 +329,8 @@ CitusStatActivity(const char *statQuery)
*/
char *nodeUser = CurrentUserName();
int32 localGroupId = GetLocalGroupId();
/* open connections in parallel */
WorkerNode *workerNode = NULL;
foreach_ptr(workerNode, workerNodeList)
@ -337,7 +339,7 @@ CitusStatActivity(const char *statQuery)
int nodePort = workerNode->workerPort;
int connectionFlags = 0;
if (workerNode->groupId == GetLocalGroupId())
if (workerNode->groupId == localGroupId)
{
/* we already get these stats via GetLocalNodeCitusDistStat() */
continue;
@ -432,7 +434,7 @@ GetLocalNodeCitusDistStat(const char *statQuery)
return citusStatsList;
}
int localGroupId = GetLocalGroupId();
int32 localGroupId = GetLocalGroupId();
/* get the current worker's node stats */
List *workerNodeList = ActivePrimaryWorkerNodeList(NoLock);

View File

@ -105,7 +105,7 @@ CheckForDistributedDeadlocks(void)
{
HASH_SEQ_STATUS status;
TransactionNode *transactionNode = NULL;
int localGroupId = GetLocalGroupId();
int32 localGroupId = GetLocalGroupId();
List *workerNodeList = ActiveReadableNodeList();
/*
@ -368,6 +368,12 @@ ResetVisitedFields(HTAB *adjacencyList)
static bool
AssociateDistributedTransactionWithBackendProc(TransactionNode *transactionNode)
{
#ifdef USE_ASSERT_CHECKING
/* if assertions are disabled, it would give unused variable warning */
int32 localGroupId = GetLocalGroupId();
#endif
for (int backendIndex = 0; backendIndex < MaxBackends; ++backendIndex)
{
PGPROC *currentProc = &ProcGlobal->allProcs[backendIndex];
@ -403,7 +409,7 @@ AssociateDistributedTransactionWithBackendProc(TransactionNode *transactionNode)
}
/* at the point we should only have transactions initiated by this node */
Assert(currentTransactionId->initiatorNodeIdentifier == GetLocalGroupId());
Assert(currentTransactionId->initiatorNodeIdentifier == localGroupId);
transactionNode->initiatorProc = currentProc;

View File

@ -93,7 +93,7 @@ BuildGlobalWaitGraph(void)
List *workerNodeList = ActiveReadableNodeList();
char *nodeUser = CitusExtensionOwnerName();
List *connectionList = NIL;
int localNodeId = GetLocalGroupId();
int32 localGroupId = GetLocalGroupId();
WaitGraph *waitGraph = BuildLocalWaitGraph();
@ -105,7 +105,7 @@ BuildGlobalWaitGraph(void)
int nodePort = workerNode->workerPort;
int connectionFlags = 0;
if (workerNode->groupId == localNodeId)
if (workerNode->groupId == localGroupId)
{
/* we already have local wait edges */
continue;

View File

@ -1356,7 +1356,7 @@ ParsePreparedTransactionName(char *preparedTransactionName,
*groupId = strtol(currentCharPointer, NULL, 10);
if ((*groupId == 0 && errno == EINVAL) ||
if ((*groupId == COORDINATOR_GROUP_ID && errno == EINVAL) ||
(*groupId == INT_MAX && errno == ERANGE))
{
return false;

View File

@ -396,7 +396,7 @@ PendingWorkerTransactionList(MultiConnection *connection)
StringInfo command = makeStringInfo();
bool raiseInterrupts = true;
List *transactionNames = NIL;
int coordinatorId = GetLocalGroupId();
int32 coordinatorId = GetLocalGroupId();
appendStringInfo(command, "SELECT gid FROM pg_prepared_xacts "
"WHERE gid LIKE 'citus\\_%d\\_%%'",

View File

@ -201,6 +201,8 @@ TargetWorkerSetNodeList(TargetWorkerSet targetWorkerSet, LOCKMODE lockMode)
List *workerNodeList = ActivePrimaryWorkerNodeList(lockMode);
List *result = NIL;
int32 localGroupId = GetLocalGroupId();
WorkerNode *workerNode = NULL;
foreach_ptr(workerNode, workerNodeList)
{
@ -208,8 +210,7 @@ TargetWorkerSetNodeList(TargetWorkerSet targetWorkerSet, LOCKMODE lockMode)
{
continue;
}
if (targetWorkerSet == OTHER_WORKERS &&
workerNode->groupId == GetLocalGroupId())
if (targetWorkerSet == OTHER_WORKERS && workerNode->groupId == localGroupId)
{
continue;
}

View File

@ -13,6 +13,7 @@
#include "catalog/namespace.h"
#include "catalog/pg_class.h"
#include "distributed/metadata_cache.h"
#include "distributed/master_protocol.h"
#include "distributed/worker_protocol.h"
#include "distributed/worker_shard_visibility.h"
#include "nodes/nodeFuncs.h"
@ -116,8 +117,7 @@ RelationIsAKnownShard(Oid shardRelationId, bool onlySearchPath)
return false;
}
int localGroupId = GetLocalGroupId();
if (localGroupId == 0)
if (IsCoordinator())
{
bool coordinatorIsKnown = false;
PrimaryNodeForGroup(0, &coordinatorIsKnown);