From 88bfd2e4b74882f0e0e916552be27333e9192d83 Mon Sep 17 00:00:00 2001 From: Onur Tirtir Date: Tue, 3 Mar 2020 10:45:02 +0300 Subject: [PATCH] refactor around local group id checks Mostyl optimizes the calls made to GetLocalGroupId and refactors its usages --- src/backend/distributed/commands/foreign_constraint.c | 3 ++- src/backend/distributed/commands/truncate.c | 4 +++- src/backend/distributed/executor/adaptive_executor.c | 4 +++- src/backend/distributed/executor/citus_custom_scan.c | 5 ++++- src/backend/distributed/executor/local_executor.c | 2 +- src/backend/distributed/master/master_node_protocol.c | 2 +- src/backend/distributed/metadata/metadata_sync.c | 2 +- src/backend/distributed/metadata/node_metadata.c | 2 +- .../distributed/planner/function_call_delegation.c | 4 ++-- src/backend/distributed/transaction/backend_data.c | 8 +++++--- .../distributed/transaction/citus_dist_stat_activity.c | 6 ++++-- .../transaction/distributed_deadlock_detection.c | 10 ++++++++-- src/backend/distributed/transaction/lock_graph.c | 4 ++-- .../distributed/transaction/remote_transaction.c | 2 +- .../distributed/transaction/transaction_recovery.c | 2 +- .../distributed/transaction/worker_transaction.c | 5 +++-- .../distributed/worker/worker_shard_visibility.c | 4 ++-- 17 files changed, 44 insertions(+), 25 deletions(-) diff --git a/src/backend/distributed/commands/foreign_constraint.c b/src/backend/distributed/commands/foreign_constraint.c index 0619df4ff..b1eab84df 100644 --- a/src/backend/distributed/commands/foreign_constraint.c +++ b/src/backend/distributed/commands/foreign_constraint.c @@ -31,7 +31,7 @@ #include "utils/ruleutils.h" #include "utils/syscache.h" - +/* Local functions forward declarations */ static bool HeapTupleOfForeignConstraintIncludesColumn(HeapTuple heapTuple, Oid relationId, int pgConstraintKey, char *columnName); @@ -173,6 +173,7 @@ ErrorIfUnsupportedForeignConstraintExists(Relation relation, char referencingDis " or a reference table."))); } + /* set referenced table related variables here in an optimized way */ if (!selfReferencingTable) { referencedDistMethod = PartitionMethod(referencedTableId); diff --git a/src/backend/distributed/commands/truncate.c b/src/backend/distributed/commands/truncate.c index f147c7a21..466bb99bb 100644 --- a/src/backend/distributed/commands/truncate.c +++ b/src/backend/distributed/commands/truncate.c @@ -234,6 +234,8 @@ AcquireDistributedLockOnRelations(List *relationIdList, LOCKMODE lockMode) UseCoordinatedTransaction(); + int32 localGroupId = GetLocalGroupId(); + foreach_oid(relationId, relationIdList) { /* @@ -256,7 +258,7 @@ AcquireDistributedLockOnRelations(List *relationIdList, LOCKMODE lockMode) int nodePort = workerNode->workerPort; /* if local node is one of the targets, acquire the lock locally */ - if (workerNode->groupId == GetLocalGroupId()) + if (workerNode->groupId == localGroupId) { LockRelationOid(relationId, lockMode); continue; diff --git a/src/backend/distributed/executor/adaptive_executor.c b/src/backend/distributed/executor/adaptive_executor.c index e11277e72..bd673e9f4 100644 --- a/src/backend/distributed/executor/adaptive_executor.c +++ b/src/backend/distributed/executor/adaptive_executor.c @@ -1609,6 +1609,8 @@ AssignTasksToConnectionsOrWorkerPool(DistributedExecution *execution) List *taskList = execution->tasksToExecute; bool hasReturning = execution->hasReturning; + int32 localGroupId = GetLocalGroupId(); + Task *task = NULL; foreach_ptr(task, taskList) { @@ -1752,7 +1754,7 @@ AssignTasksToConnectionsOrWorkerPool(DistributedExecution *execution) } if (!TransactionConnectedToLocalGroup && taskPlacement->groupId == - GetLocalGroupId()) + localGroupId) { TransactionConnectedToLocalGroup = true; } diff --git a/src/backend/distributed/executor/citus_custom_scan.c b/src/backend/distributed/executor/citus_custom_scan.c index 05c7bbd13..6590f7f29 100644 --- a/src/backend/distributed/executor/citus_custom_scan.c +++ b/src/backend/distributed/executor/citus_custom_scan.c @@ -532,10 +532,13 @@ GetCachedLocalPlan(Task *task, DistributedPlan *distributedPlan) { List *cachedPlanList = distributedPlan->workerJob->localPlannedStatements; LocalPlannedStatement *localPlannedStatement = NULL; + + int32 localGroupId = GetLocalGroupId(); + foreach_ptr(localPlannedStatement, cachedPlanList) { if (localPlannedStatement->shardId == task->anchorShardId && - localPlannedStatement->localGroupId == GetLocalGroupId()) + localPlannedStatement->localGroupId == localGroupId) { /* already have a cached plan, no need to continue */ return localPlannedStatement->localPlan; diff --git a/src/backend/distributed/executor/local_executor.c b/src/backend/distributed/executor/local_executor.c index f460b96aa..3451994c9 100644 --- a/src/backend/distributed/executor/local_executor.c +++ b/src/backend/distributed/executor/local_executor.c @@ -504,7 +504,7 @@ ShouldExecuteTasksLocally(List *taskList) bool TaskAccessesLocalNode(Task *task) { - int localGroupId = GetLocalGroupId(); + int32 localGroupId = GetLocalGroupId(); ShardPlacement *taskPlacement = NULL; foreach_ptr(taskPlacement, task->taskPlacementList) diff --git a/src/backend/distributed/master/master_node_protocol.c b/src/backend/distributed/master/master_node_protocol.c index c45eea7b9..6aa19f5bf 100644 --- a/src/backend/distributed/master/master_node_protocol.c +++ b/src/backend/distributed/master/master_node_protocol.c @@ -772,7 +772,7 @@ ShardStorageType(Oid relationId) bool IsCoordinator(void) { - return (GetLocalGroupId() == 0); + return (GetLocalGroupId() == COORDINATOR_GROUP_ID); } diff --git a/src/backend/distributed/metadata/metadata_sync.c b/src/backend/distributed/metadata/metadata_sync.c index 36915580d..864d56adc 100644 --- a/src/backend/distributed/metadata/metadata_sync.c +++ b/src/backend/distributed/metadata/metadata_sync.c @@ -199,7 +199,7 @@ ClusterHasKnownMetadataWorkers() { bool workerWithMetadata = false; - if (GetLocalGroupId() != 0) + if (!IsCoordinator()) { workerWithMetadata = true; } diff --git a/src/backend/distributed/metadata/node_metadata.c b/src/backend/distributed/metadata/node_metadata.c index 779c4362d..21d6baa63 100644 --- a/src/backend/distributed/metadata/node_metadata.c +++ b/src/backend/distributed/metadata/node_metadata.c @@ -1334,7 +1334,7 @@ GetNextNodeId() void EnsureCoordinator(void) { - int localGroupId = GetLocalGroupId(); + int32 localGroupId = GetLocalGroupId(); if (localGroupId != 0) { diff --git a/src/backend/distributed/planner/function_call_delegation.c b/src/backend/distributed/planner/function_call_delegation.c index 51e5d4bfd..aa5f415df 100644 --- a/src/backend/distributed/planner/function_call_delegation.c +++ b/src/backend/distributed/planner/function_call_delegation.c @@ -123,8 +123,8 @@ TryToDelegateFunctionCall(DistributedPlanningContext *planContext) return NULL; } - int32 groupId = GetLocalGroupId(); - if (groupId != 0 || groupId == GROUP_ID_UPGRADING) + int32 localGroupId = GetLocalGroupId(); + if (localGroupId != COORDINATOR_GROUP_ID || localGroupId == GROUP_ID_UPGRADING) { /* do not delegate from workers, or while upgrading */ return NULL; diff --git a/src/backend/distributed/transaction/backend_data.c b/src/backend/distributed/transaction/backend_data.c index c29f8a6cb..8cf3ea0bb 100644 --- a/src/backend/distributed/transaction/backend_data.c +++ b/src/backend/distributed/transaction/backend_data.c @@ -226,6 +226,8 @@ get_global_active_transactions(PG_FUNCTION_ARGS) /* add active transactions for local node */ StoreAllActiveTransactions(tupleStore, tupleDescriptor); + int32 localGroupId = GetLocalGroupId(); + /* open connections in parallel */ WorkerNode *workerNode = NULL; foreach_ptr(workerNode, workerNodeList) @@ -234,7 +236,7 @@ get_global_active_transactions(PG_FUNCTION_ARGS) int nodePort = workerNode->workerPort; int connectionFlags = 0; - if (workerNode->groupId == GetLocalGroupId()) + if (workerNode->groupId == localGroupId) { /* we already get these transactions via GetAllActiveTransactions() */ continue; @@ -726,7 +728,7 @@ AssignDistributedTransactionId(void) &backendManagementShmemData->nextTransactionNumber; uint64 nextTransactionNumber = pg_atomic_fetch_add_u64(transactionNumberSequence, 1); - int localGroupId = GetLocalGroupId(); + int32 localGroupId = GetLocalGroupId(); TimestampTz currentTimestamp = GetCurrentTimestamp(); Oid userId = GetUserId(); @@ -758,7 +760,7 @@ MarkCitusInitiatedCoordinatorBackend(void) * GetLocalGroupId may throw exception which can cause leaving spin lock * unreleased. Calling GetLocalGroupId function before the lock to avoid this. */ - int localGroupId = GetLocalGroupId(); + int32 localGroupId = GetLocalGroupId(); SpinLockAcquire(&MyBackendData->mutex); diff --git a/src/backend/distributed/transaction/citus_dist_stat_activity.c b/src/backend/distributed/transaction/citus_dist_stat_activity.c index 1af7e2347..93302a798 100644 --- a/src/backend/distributed/transaction/citus_dist_stat_activity.c +++ b/src/backend/distributed/transaction/citus_dist_stat_activity.c @@ -329,6 +329,8 @@ CitusStatActivity(const char *statQuery) */ char *nodeUser = CurrentUserName(); + int32 localGroupId = GetLocalGroupId(); + /* open connections in parallel */ WorkerNode *workerNode = NULL; foreach_ptr(workerNode, workerNodeList) @@ -337,7 +339,7 @@ CitusStatActivity(const char *statQuery) int nodePort = workerNode->workerPort; int connectionFlags = 0; - if (workerNode->groupId == GetLocalGroupId()) + if (workerNode->groupId == localGroupId) { /* we already get these stats via GetLocalNodeCitusDistStat() */ continue; @@ -432,7 +434,7 @@ GetLocalNodeCitusDistStat(const char *statQuery) return citusStatsList; } - int localGroupId = GetLocalGroupId(); + int32 localGroupId = GetLocalGroupId(); /* get the current worker's node stats */ List *workerNodeList = ActivePrimaryWorkerNodeList(NoLock); diff --git a/src/backend/distributed/transaction/distributed_deadlock_detection.c b/src/backend/distributed/transaction/distributed_deadlock_detection.c index 112aafc92..3fee30e62 100644 --- a/src/backend/distributed/transaction/distributed_deadlock_detection.c +++ b/src/backend/distributed/transaction/distributed_deadlock_detection.c @@ -105,7 +105,7 @@ CheckForDistributedDeadlocks(void) { HASH_SEQ_STATUS status; TransactionNode *transactionNode = NULL; - int localGroupId = GetLocalGroupId(); + int32 localGroupId = GetLocalGroupId(); List *workerNodeList = ActiveReadableNodeList(); /* @@ -368,6 +368,12 @@ ResetVisitedFields(HTAB *adjacencyList) static bool AssociateDistributedTransactionWithBackendProc(TransactionNode *transactionNode) { +#ifdef USE_ASSERT_CHECKING + + /* if assertions are disabled, it would give unused variable warning */ + int32 localGroupId = GetLocalGroupId(); +#endif + for (int backendIndex = 0; backendIndex < MaxBackends; ++backendIndex) { PGPROC *currentProc = &ProcGlobal->allProcs[backendIndex]; @@ -403,7 +409,7 @@ AssociateDistributedTransactionWithBackendProc(TransactionNode *transactionNode) } /* at the point we should only have transactions initiated by this node */ - Assert(currentTransactionId->initiatorNodeIdentifier == GetLocalGroupId()); + Assert(currentTransactionId->initiatorNodeIdentifier == localGroupId); transactionNode->initiatorProc = currentProc; diff --git a/src/backend/distributed/transaction/lock_graph.c b/src/backend/distributed/transaction/lock_graph.c index 417becd10..aed021ae0 100644 --- a/src/backend/distributed/transaction/lock_graph.c +++ b/src/backend/distributed/transaction/lock_graph.c @@ -93,7 +93,7 @@ BuildGlobalWaitGraph(void) List *workerNodeList = ActiveReadableNodeList(); char *nodeUser = CitusExtensionOwnerName(); List *connectionList = NIL; - int localNodeId = GetLocalGroupId(); + int32 localGroupId = GetLocalGroupId(); WaitGraph *waitGraph = BuildLocalWaitGraph(); @@ -105,7 +105,7 @@ BuildGlobalWaitGraph(void) int nodePort = workerNode->workerPort; int connectionFlags = 0; - if (workerNode->groupId == localNodeId) + if (workerNode->groupId == localGroupId) { /* we already have local wait edges */ continue; diff --git a/src/backend/distributed/transaction/remote_transaction.c b/src/backend/distributed/transaction/remote_transaction.c index 0a2fd9d6b..2ffe1c45c 100644 --- a/src/backend/distributed/transaction/remote_transaction.c +++ b/src/backend/distributed/transaction/remote_transaction.c @@ -1356,7 +1356,7 @@ ParsePreparedTransactionName(char *preparedTransactionName, *groupId = strtol(currentCharPointer, NULL, 10); - if ((*groupId == 0 && errno == EINVAL) || + if ((*groupId == COORDINATOR_GROUP_ID && errno == EINVAL) || (*groupId == INT_MAX && errno == ERANGE)) { return false; diff --git a/src/backend/distributed/transaction/transaction_recovery.c b/src/backend/distributed/transaction/transaction_recovery.c index 132db008f..a401b2cee 100644 --- a/src/backend/distributed/transaction/transaction_recovery.c +++ b/src/backend/distributed/transaction/transaction_recovery.c @@ -396,7 +396,7 @@ PendingWorkerTransactionList(MultiConnection *connection) StringInfo command = makeStringInfo(); bool raiseInterrupts = true; List *transactionNames = NIL; - int coordinatorId = GetLocalGroupId(); + int32 coordinatorId = GetLocalGroupId(); appendStringInfo(command, "SELECT gid FROM pg_prepared_xacts " "WHERE gid LIKE 'citus\\_%d\\_%%'", diff --git a/src/backend/distributed/transaction/worker_transaction.c b/src/backend/distributed/transaction/worker_transaction.c index f8be9cd4e..343c4c67d 100644 --- a/src/backend/distributed/transaction/worker_transaction.c +++ b/src/backend/distributed/transaction/worker_transaction.c @@ -201,6 +201,8 @@ TargetWorkerSetNodeList(TargetWorkerSet targetWorkerSet, LOCKMODE lockMode) List *workerNodeList = ActivePrimaryWorkerNodeList(lockMode); List *result = NIL; + int32 localGroupId = GetLocalGroupId(); + WorkerNode *workerNode = NULL; foreach_ptr(workerNode, workerNodeList) { @@ -208,8 +210,7 @@ TargetWorkerSetNodeList(TargetWorkerSet targetWorkerSet, LOCKMODE lockMode) { continue; } - if (targetWorkerSet == OTHER_WORKERS && - workerNode->groupId == GetLocalGroupId()) + if (targetWorkerSet == OTHER_WORKERS && workerNode->groupId == localGroupId) { continue; } diff --git a/src/backend/distributed/worker/worker_shard_visibility.c b/src/backend/distributed/worker/worker_shard_visibility.c index 58f818267..259d4a7e7 100644 --- a/src/backend/distributed/worker/worker_shard_visibility.c +++ b/src/backend/distributed/worker/worker_shard_visibility.c @@ -13,6 +13,7 @@ #include "catalog/namespace.h" #include "catalog/pg_class.h" #include "distributed/metadata_cache.h" +#include "distributed/master_protocol.h" #include "distributed/worker_protocol.h" #include "distributed/worker_shard_visibility.h" #include "nodes/nodeFuncs.h" @@ -116,8 +117,7 @@ RelationIsAKnownShard(Oid shardRelationId, bool onlySearchPath) return false; } - int localGroupId = GetLocalGroupId(); - if (localGroupId == 0) + if (IsCoordinator()) { bool coordinatorIsKnown = false; PrimaryNodeForGroup(0, &coordinatorIsKnown);