diff --git a/src/backend/distributed/commands/foreign_constraint.c b/src/backend/distributed/commands/foreign_constraint.c index 6d6abe07c..6a3923d1a 100644 --- a/src/backend/distributed/commands/foreign_constraint.c +++ b/src/backend/distributed/commands/foreign_constraint.c @@ -31,7 +31,7 @@ #include "utils/ruleutils.h" #include "utils/syscache.h" - +/* Local functions forward declarations */ static bool HeapTupleOfForeignConstraintIncludesColumn(HeapTuple heapTuple, Oid relationId, int pgConstraintKey, char *columnName); @@ -96,22 +96,24 @@ ConstraintIsAForeignKeyToReferenceTable(char *constraintName, Oid relationId) /* - * ErrorIfUnsupportedForeignConstraintExists runs checks related to foreign constraints and - * errors out if it is not possible to create one of the foreign constraint in distributed - * environment. + * ErrorIfUnsupportedForeignConstraintExists runs checks related to foreign + * constraints and errors out if it is not possible to create one of the + * foreign constraint in distributed environment. * * To support foreign constraints, we require that; * - If referencing and referenced tables are hash-distributed * - Referencing and referenced tables are co-located. * - Foreign constraint is defined over distribution column. - * - ON DELETE/UPDATE SET NULL, ON DELETE/UPDATE SET DEFAULT and ON UPDATE CASCADE options + * - ON DELETE/UPDATE SET NULL, ON DELETE/UPDATE SET DEFAULT and + * ON UPDATE CASCADE options * are not used. * - Replication factors of referencing and referenced table are 1. * - If referenced table is a reference table - * - ON DELETE/UPDATE SET NULL, ON DELETE/UPDATE SET DEFAULT and ON UPDATE CASCADE options - * are not used on the distribution key of the referencing column. - * - If referencing table is a reference table, error out if the referenced table is not a - * a reference table. + * - ON DELETE/UPDATE SET NULL, ON DELETE/UPDATE SET DEFAULT and + * ON UPDATE CASCADE options are not used on the distribution key + * of the referencing column. + * - If referencing table is a reference table, error out if the referenced + * table is not a reference table. */ void ErrorIfUnsupportedForeignConstraintExists(Relation relation, char referencingDistMethod, @@ -122,12 +124,10 @@ ErrorIfUnsupportedForeignConstraintExists(Relation relation, char referencingDis int scanKeyCount = 1; Oid referencingTableId = relation->rd_id; - Oid referencedTableId = InvalidOid; - uint32 referencedColocationId = INVALID_COLOCATION_ID; - bool selfReferencingTable = false; bool referencingNotReplicated = true; + bool referencingIsDistributed = IsDistributedTable(referencingTableId); - if (IsDistributedTable(referencingTableId)) + if (referencingIsDistributed) { /* ALTER TABLE command is applied over single replicated table */ referencingNotReplicated = SingleReplicatedTable(referencingTableId); @@ -150,21 +150,26 @@ ErrorIfUnsupportedForeignConstraintExists(Relation relation, char referencingDis while (HeapTupleIsValid(heapTuple)) { Form_pg_constraint constraintForm = (Form_pg_constraint) GETSTRUCT(heapTuple); + + int referencingAttrIndex = -1; + char referencedDistMethod = 0; Var *referencedDistKey = NULL; - int referencingAttrIndex = -1; int referencedAttrIndex = -1; + uint32 referencedColocationId = INVALID_COLOCATION_ID; + /* not a foreign key constraint, skip to next one */ if (constraintForm->contype != CONSTRAINT_FOREIGN) { heapTuple = systable_getnext(scanDescriptor); continue; } - referencedTableId = constraintForm->confrelid; - selfReferencingTable = (referencingTableId == referencedTableId); - + Oid referencedTableId = constraintForm->confrelid; bool referencedIsDistributed = IsDistributedTable(referencedTableId); + + bool selfReferencingTable = (referencingTableId == referencedTableId); + if (!referencedIsDistributed && !selfReferencingTable) { ereport(ERROR, (errcode(ERRCODE_INVALID_TABLE_DEFINITION), @@ -173,6 +178,8 @@ ErrorIfUnsupportedForeignConstraintExists(Relation relation, char referencingDis " or a reference table."))); } + /* set referenced table related variables here if table is referencing itself */ + if (!selfReferencingTable) { referencedDistMethod = PartitionMethod(referencedTableId); @@ -191,7 +198,6 @@ ErrorIfUnsupportedForeignConstraintExists(Relation relation, char referencingDis bool referencingIsReferenceTable = (referencingDistMethod == DISTRIBUTE_BY_NONE); bool referencedIsReferenceTable = (referencedDistMethod == DISTRIBUTE_BY_NONE); - /* * We support foreign keys between reference tables. No more checks * are necessary. @@ -461,6 +467,7 @@ ColumnAppearsInForeignKeyToReferenceTable(char *columnName, Oid relationId) /* clean up scan and close system catalog */ systable_endscan(scanDescriptor); heap_close(pgConstraint, AccessShareLock); + return foreignKeyToReferenceTableIncludesGivenColumn; } @@ -562,6 +569,7 @@ HasForeignKeyToReferenceTable(Oid relationId) if (!IsDistributedTable(referencedTableId)) { + heapTuple = systable_getnext(scanDescriptor); continue; } @@ -605,6 +613,7 @@ TableReferenced(Oid relationId) scanKeyCount, scanKey); HeapTuple heapTuple = systable_getnext(scanDescriptor); + while (HeapTupleIsValid(heapTuple)) { Form_pg_constraint constraintForm = (Form_pg_constraint) GETSTRUCT(heapTuple); @@ -620,6 +629,8 @@ TableReferenced(Oid relationId) heapTuple = systable_getnext(scanDescriptor); } + /* clean up scan and close system catalog */ + systable_endscan(scanDescriptor); heap_close(pgConstraint, NoLock); diff --git a/src/backend/distributed/commands/table.c b/src/backend/distributed/commands/table.c index f4aad4f36..081266dc1 100644 --- a/src/backend/distributed/commands/table.c +++ b/src/backend/distributed/commands/table.c @@ -122,9 +122,9 @@ PreprocessDropTableStmt(Node *node, const char *queryString) /* - * PostprocessCreateTableStmtPartitionOf takes CreateStmt object as a parameter but - * it only processes CREATE TABLE ... PARTITION OF statements and it checks if - * user creates the table as a partition of a distributed table. In that case, + * PostprocessCreateTableStmtPartitionOf takes CreateStmt object as a parameter + * but it only processes CREATE TABLE ... PARTITION OF statements and it checks + * if user creates the table as a partition of a distributed table. In that case, * it distributes partition as well. Since the table itself is a partition, * CreateDistributedTable will attach it to its parent table automatically after * distributing it. @@ -173,9 +173,10 @@ PostprocessCreateTableStmtPartitionOf(CreateStmt *createStatement, const /* - * PostprocessAlterTableStmtAttachPartition takes AlterTableStmt object as parameter - * but it only processes into ALTER TABLE ... ATTACH PARTITION commands and - * distributes the partition if necessary. There are four cases to consider; + * PostprocessAlterTableStmtAttachPartition takes AlterTableStmt object as + * parameter but it only processes into ALTER TABLE ... ATTACH PARTITION + * commands and distributes the partition if necessary. There are four cases + * to consider; * * Parent is not distributed, partition is not distributed: We do not need to * do anything in this case. @@ -250,9 +251,9 @@ PostprocessAlterTableStmtAttachPartition(AlterTableStmt *alterTableStatement, /* - * PostprocessAlterTableSchemaStmt is executed after the change has been applied locally, we - * can now use the new dependencies of the table to ensure all its dependencies exist on - * the workers before we apply the commands remotely. + * PostprocessAlterTableSchemaStmt is executed after the change has been applied + * locally, we can now use the new dependencies of the table to ensure all its + * dependencies exist on the workers before we apply the commands remotely. */ List * PostprocessAlterTableSchemaStmt(Node *node, const char *queryString) @@ -274,19 +275,18 @@ PostprocessAlterTableSchemaStmt(Node *node, const char *queryString) /* - * PreprocessAlterTableStmt determines whether a given ALTER TABLE statement involves - * a distributed table. If so (and if the statement does not use unsupported - * options), it modifies the input statement to ensure proper execution against - * the master node table and creates a DDLJob to encapsulate information needed - * during the worker node portion of DDL execution before returning that DDLJob - * in a List. If no distributed table is involved, this function returns NIL. + * PreprocessAlterTableStmt determines whether a given ALTER TABLE statement + * involves a distributed table. If so (and if the statement does not use + * unsupported options), it modifies the input statement to ensure proper + * execution against the master node table and creates a DDLJob to encapsulate + * information needed during the worker node portion of DDL execution before + * returning that DDLJob in a List. If no distributed table is involved, this + * function returns NIL. */ List * PreprocessAlterTableStmt(Node *node, const char *alterTableCommand) { AlterTableStmt *alterTableStatement = castNode(AlterTableStmt, node); - Oid rightRelationId = InvalidOid; - bool executeSequentially = false; /* first check whether a distributed relation is affected */ if (alterTableStatement->relation == NULL) @@ -296,6 +296,7 @@ PreprocessAlterTableStmt(Node *node, const char *alterTableCommand) LOCKMODE lockmode = AlterTableGetLockLevel(alterTableStatement->cmds); Oid leftRelationId = AlterTableLookupRelation(alterTableStatement, lockmode); + if (!OidIsValid(leftRelationId)) { return NIL; @@ -309,11 +310,12 @@ PreprocessAlterTableStmt(Node *node, const char *alterTableCommand) char leftRelationKind = get_rel_relkind(leftRelationId); if (leftRelationKind == RELKIND_INDEX) { - leftRelationId = IndexGetRelation(leftRelationId, false); + bool missingOk = false; + leftRelationId = IndexGetRelation(leftRelationId, missingOk); } - bool isDistributedRelation = IsDistributedTable(leftRelationId); - if (!isDistributedRelation) + bool referencingIsLocalTable = !IsDistributedTable(leftRelationId); + if (referencingIsLocalTable) { return NIL; } @@ -334,13 +336,19 @@ PreprocessAlterTableStmt(Node *node, const char *alterTableCommand) ErrorIfUnsupportedAlterTableStmt(alterTableStatement); } + /* these will be set in below loop according to subcommands */ + Oid rightRelationId = InvalidOid; + bool executeSequentially = false; + /* - * We check if there is a ADD/DROP FOREIGN CONSTRAINT command in sub commands list. - * If there is we assign referenced relation id to rightRelationId and we also - * set skip_validation to true to prevent PostgreSQL to verify validity of the - * foreign constraint in master. Validity will be checked in workers anyway. + * We check if there is a ADD/DROP FOREIGN CONSTRAINT command in sub commands + * list. If there is we assign referenced relation id to rightRelationId and + * we also set skip_validation to true to prevent PostgreSQL to verify validity + * of the foreign constraint in master. Validity will be checked in workers + * anyway. */ List *commandList = alterTableStatement->cmds; + AlterTableCmd *command = NULL; foreach_ptr(command, commandList) { @@ -438,6 +446,10 @@ PreprocessAlterTableStmt(Node *node, const char *alterTableCommand) rightRelationId = RangeVarGetRelid(partitionCommand->name, NoLock, false); } + /* + * We check and set the execution mode only if we fall into either of first two + * conditional blocks, otherwise we already continue the loop + */ executeSequentially |= SetupExecutionModeForAlterTable(leftRelationId, command); } @@ -447,14 +459,16 @@ PreprocessAlterTableStmt(Node *node, const char *alterTableCommand) SetLocalMultiShardModifyModeToSequential(); } + /* fill them here as it is possible to use them in some condtional blocks below */ DDLJob *ddlJob = palloc0(sizeof(DDLJob)); ddlJob->targetRelationId = leftRelationId; ddlJob->concurrentIndexCmd = false; ddlJob->commandString = alterTableCommand; - if (rightRelationId) + if (OidIsValid(rightRelationId)) { - if (!IsDistributedTable(rightRelationId)) + bool referencedIsLocalTable = !IsDistributedTable(rightRelationId); + if (referencedIsLocalTable) { ddlJob->taskList = NIL; } @@ -496,10 +510,11 @@ PreprocessAlterTableMoveAllStmt(Node *node, const char *queryString) /* - * PreprocessAlterTableSchemaStmt is executed before the statement is applied to the local - * postgres instance. + * PreprocessAlterTableSchemaStmt is executed before the statement is applied + * to the local postgres instance. * - * In this stage we can prepare the commands that will alter the schemas of the shards. + * In this stage we can prepare the commands that will alter the schemas of the + * shards. */ List * PreprocessAlterTableSchemaStmt(Node *node, const char *queryString) @@ -664,11 +679,11 @@ ErrorIfAlterDropsPartitionColumn(AlterTableStmt *alterTableStatement) /* - * PostprocessAlterTableStmt runs after the ALTER TABLE command has already run on the - * master, so we are checking constraints over the table with constraints already defined - * (to make the constraint check process same for ALTER TABLE and CREATE TABLE). If - * constraints do not fulfill the rules we defined, they will be removed and the table - * will return back to the state before the ALTER TABLE command. + * PostprocessAlterTableStmt runs after the ALTER TABLE command has already run + * on the master, so we are checking constraints over the table with constraints + * already defined (to make the constraint check process same for ALTER TABLE and + * CREATE TABLE). If constraints do not fulfill the rules we defined, they will be + * removed and the table will return back to the state before the ALTER TABLE command. */ void PostprocessAlterTableStmt(AlterTableStmt *alterTableStatement) @@ -845,11 +860,11 @@ ErrorIfUnsupportedConstraint(Relation relation, char distributionMethod, Var *distributionColumn, uint32 colocationId) { /* - * We first perform check for foreign constraints. It is important to do this check - * before next check, because other types of constraints are allowed on reference - * tables and we return early for those constraints thanks to next check. Therefore, - * for reference tables, we first check for foreing constraints and if they are OK, - * we do not error out for other types of constraints. + * We first perform check for foreign constraints. It is important to do this + * check before next check, because other types of constraints are allowed on + * reference tables and we return early for those constraints thanks to next + * check. Therefore, for reference tables, we first check for foreign constraints + * and if they are OK, we do not error out for other types of constraints. */ ErrorIfUnsupportedForeignConstraintExists(relation, distributionMethod, distributionColumn, @@ -929,13 +944,14 @@ ErrorIfUnsupportedConstraint(Relation relation, char distributionMethod, if (!hasDistributionColumn) { - ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("cannot create constraint on \"%s\"", - relationName), - errdetail("Distributed relations cannot have UNIQUE, " - "EXCLUDE, or PRIMARY KEY constraints that do not " - "include the partition column (with an equality " - "operator if EXCLUDE)."))); + ereport(ERROR, ( + errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot create constraint on \"%s\"", + relationName), + errdetail("Distributed relations cannot have UNIQUE, " + "EXCLUDE, or PRIMARY KEY constraints that do not " + "include the partition column (with an equality " + "operator if EXCLUDE)."))); } index_close(indexDesc, NoLock); @@ -1434,13 +1450,13 @@ ErrorIfUnsupportedAlterAddConstraintStmt(AlterTableStmt *alterTableStatement) /* - * AlterTableSchemaStmtObjectAddress returns the ObjectAddress of the table that is the - * object of the AlterObjectSchemaStmt. + * AlterTableSchemaStmtObjectAddress returns the ObjectAddress of the table that + * is the object of the AlterObjectSchemaStmt. * - * This could be called both before or after it has been applied locally. It will look in - * the old schema first, if the table cannot be found in that schema it will look in the - * new schema. Errors if missing_ok is false and the table cannot be found in either of the - * schemas. + * This could be called both before or after it has been applied locally. It will + * look in the old schema first, if the table cannot be found in that schema it + * will look in the new schema. Errors if missing_ok is false and the table cannot + * be found in either of the schemas. */ ObjectAddress AlterTableSchemaStmtObjectAddress(Node *node, bool missing_ok) @@ -1469,10 +1485,12 @@ AlterTableSchemaStmtObjectAddress(Node *node, bool missing_ok) if (!missing_ok && tableOid == InvalidOid) { + const char *quotedTableName = + quote_qualified_identifier(stmt->relation->schemaname, tableName); + ereport(ERROR, (errcode(ERRCODE_UNDEFINED_TABLE), errmsg("relation \"%s\" does not exist", - quote_qualified_identifier(stmt->relation->schemaname, - tableName)))); + quotedTableName))); } } diff --git a/src/backend/distributed/commands/truncate.c b/src/backend/distributed/commands/truncate.c index f147c7a21..466bb99bb 100644 --- a/src/backend/distributed/commands/truncate.c +++ b/src/backend/distributed/commands/truncate.c @@ -234,6 +234,8 @@ AcquireDistributedLockOnRelations(List *relationIdList, LOCKMODE lockMode) UseCoordinatedTransaction(); + int32 localGroupId = GetLocalGroupId(); + foreach_oid(relationId, relationIdList) { /* @@ -256,7 +258,7 @@ AcquireDistributedLockOnRelations(List *relationIdList, LOCKMODE lockMode) int nodePort = workerNode->workerPort; /* if local node is one of the targets, acquire the lock locally */ - if (workerNode->groupId == GetLocalGroupId()) + if (workerNode->groupId == localGroupId) { LockRelationOid(relationId, lockMode); continue; diff --git a/src/backend/distributed/executor/adaptive_executor.c b/src/backend/distributed/executor/adaptive_executor.c index e11277e72..bd673e9f4 100644 --- a/src/backend/distributed/executor/adaptive_executor.c +++ b/src/backend/distributed/executor/adaptive_executor.c @@ -1609,6 +1609,8 @@ AssignTasksToConnectionsOrWorkerPool(DistributedExecution *execution) List *taskList = execution->tasksToExecute; bool hasReturning = execution->hasReturning; + int32 localGroupId = GetLocalGroupId(); + Task *task = NULL; foreach_ptr(task, taskList) { @@ -1752,7 +1754,7 @@ AssignTasksToConnectionsOrWorkerPool(DistributedExecution *execution) } if (!TransactionConnectedToLocalGroup && taskPlacement->groupId == - GetLocalGroupId()) + localGroupId) { TransactionConnectedToLocalGroup = true; } diff --git a/src/backend/distributed/executor/citus_custom_scan.c b/src/backend/distributed/executor/citus_custom_scan.c index 05c7bbd13..6590f7f29 100644 --- a/src/backend/distributed/executor/citus_custom_scan.c +++ b/src/backend/distributed/executor/citus_custom_scan.c @@ -532,10 +532,13 @@ GetCachedLocalPlan(Task *task, DistributedPlan *distributedPlan) { List *cachedPlanList = distributedPlan->workerJob->localPlannedStatements; LocalPlannedStatement *localPlannedStatement = NULL; + + int32 localGroupId = GetLocalGroupId(); + foreach_ptr(localPlannedStatement, cachedPlanList) { if (localPlannedStatement->shardId == task->anchorShardId && - localPlannedStatement->localGroupId == GetLocalGroupId()) + localPlannedStatement->localGroupId == localGroupId) { /* already have a cached plan, no need to continue */ return localPlannedStatement->localPlan; diff --git a/src/backend/distributed/executor/local_executor.c b/src/backend/distributed/executor/local_executor.c index f460b96aa..3451994c9 100644 --- a/src/backend/distributed/executor/local_executor.c +++ b/src/backend/distributed/executor/local_executor.c @@ -504,7 +504,7 @@ ShouldExecuteTasksLocally(List *taskList) bool TaskAccessesLocalNode(Task *task) { - int localGroupId = GetLocalGroupId(); + int32 localGroupId = GetLocalGroupId(); ShardPlacement *taskPlacement = NULL; foreach_ptr(taskPlacement, task->taskPlacementList) diff --git a/src/backend/distributed/master/master_node_protocol.c b/src/backend/distributed/master/master_node_protocol.c index c45eea7b9..6aa19f5bf 100644 --- a/src/backend/distributed/master/master_node_protocol.c +++ b/src/backend/distributed/master/master_node_protocol.c @@ -772,7 +772,7 @@ ShardStorageType(Oid relationId) bool IsCoordinator(void) { - return (GetLocalGroupId() == 0); + return (GetLocalGroupId() == COORDINATOR_GROUP_ID); } diff --git a/src/backend/distributed/metadata/metadata_sync.c b/src/backend/distributed/metadata/metadata_sync.c index 36915580d..864d56adc 100644 --- a/src/backend/distributed/metadata/metadata_sync.c +++ b/src/backend/distributed/metadata/metadata_sync.c @@ -199,7 +199,7 @@ ClusterHasKnownMetadataWorkers() { bool workerWithMetadata = false; - if (GetLocalGroupId() != 0) + if (!IsCoordinator()) { workerWithMetadata = true; } diff --git a/src/backend/distributed/metadata/node_metadata.c b/src/backend/distributed/metadata/node_metadata.c index 779c4362d..21d6baa63 100644 --- a/src/backend/distributed/metadata/node_metadata.c +++ b/src/backend/distributed/metadata/node_metadata.c @@ -1334,7 +1334,7 @@ GetNextNodeId() void EnsureCoordinator(void) { - int localGroupId = GetLocalGroupId(); + int32 localGroupId = GetLocalGroupId(); if (localGroupId != 0) { diff --git a/src/backend/distributed/planner/function_call_delegation.c b/src/backend/distributed/planner/function_call_delegation.c index 51e5d4bfd..aa5f415df 100644 --- a/src/backend/distributed/planner/function_call_delegation.c +++ b/src/backend/distributed/planner/function_call_delegation.c @@ -123,8 +123,8 @@ TryToDelegateFunctionCall(DistributedPlanningContext *planContext) return NULL; } - int32 groupId = GetLocalGroupId(); - if (groupId != 0 || groupId == GROUP_ID_UPGRADING) + int32 localGroupId = GetLocalGroupId(); + if (localGroupId != COORDINATOR_GROUP_ID || localGroupId == GROUP_ID_UPGRADING) { /* do not delegate from workers, or while upgrading */ return NULL; diff --git a/src/backend/distributed/transaction/backend_data.c b/src/backend/distributed/transaction/backend_data.c index c29f8a6cb..8cf3ea0bb 100644 --- a/src/backend/distributed/transaction/backend_data.c +++ b/src/backend/distributed/transaction/backend_data.c @@ -226,6 +226,8 @@ get_global_active_transactions(PG_FUNCTION_ARGS) /* add active transactions for local node */ StoreAllActiveTransactions(tupleStore, tupleDescriptor); + int32 localGroupId = GetLocalGroupId(); + /* open connections in parallel */ WorkerNode *workerNode = NULL; foreach_ptr(workerNode, workerNodeList) @@ -234,7 +236,7 @@ get_global_active_transactions(PG_FUNCTION_ARGS) int nodePort = workerNode->workerPort; int connectionFlags = 0; - if (workerNode->groupId == GetLocalGroupId()) + if (workerNode->groupId == localGroupId) { /* we already get these transactions via GetAllActiveTransactions() */ continue; @@ -726,7 +728,7 @@ AssignDistributedTransactionId(void) &backendManagementShmemData->nextTransactionNumber; uint64 nextTransactionNumber = pg_atomic_fetch_add_u64(transactionNumberSequence, 1); - int localGroupId = GetLocalGroupId(); + int32 localGroupId = GetLocalGroupId(); TimestampTz currentTimestamp = GetCurrentTimestamp(); Oid userId = GetUserId(); @@ -758,7 +760,7 @@ MarkCitusInitiatedCoordinatorBackend(void) * GetLocalGroupId may throw exception which can cause leaving spin lock * unreleased. Calling GetLocalGroupId function before the lock to avoid this. */ - int localGroupId = GetLocalGroupId(); + int32 localGroupId = GetLocalGroupId(); SpinLockAcquire(&MyBackendData->mutex); diff --git a/src/backend/distributed/transaction/citus_dist_stat_activity.c b/src/backend/distributed/transaction/citus_dist_stat_activity.c index 1af7e2347..93302a798 100644 --- a/src/backend/distributed/transaction/citus_dist_stat_activity.c +++ b/src/backend/distributed/transaction/citus_dist_stat_activity.c @@ -329,6 +329,8 @@ CitusStatActivity(const char *statQuery) */ char *nodeUser = CurrentUserName(); + int32 localGroupId = GetLocalGroupId(); + /* open connections in parallel */ WorkerNode *workerNode = NULL; foreach_ptr(workerNode, workerNodeList) @@ -337,7 +339,7 @@ CitusStatActivity(const char *statQuery) int nodePort = workerNode->workerPort; int connectionFlags = 0; - if (workerNode->groupId == GetLocalGroupId()) + if (workerNode->groupId == localGroupId) { /* we already get these stats via GetLocalNodeCitusDistStat() */ continue; @@ -432,7 +434,7 @@ GetLocalNodeCitusDistStat(const char *statQuery) return citusStatsList; } - int localGroupId = GetLocalGroupId(); + int32 localGroupId = GetLocalGroupId(); /* get the current worker's node stats */ List *workerNodeList = ActivePrimaryWorkerNodeList(NoLock); diff --git a/src/backend/distributed/transaction/distributed_deadlock_detection.c b/src/backend/distributed/transaction/distributed_deadlock_detection.c index 112aafc92..3fee30e62 100644 --- a/src/backend/distributed/transaction/distributed_deadlock_detection.c +++ b/src/backend/distributed/transaction/distributed_deadlock_detection.c @@ -105,7 +105,7 @@ CheckForDistributedDeadlocks(void) { HASH_SEQ_STATUS status; TransactionNode *transactionNode = NULL; - int localGroupId = GetLocalGroupId(); + int32 localGroupId = GetLocalGroupId(); List *workerNodeList = ActiveReadableNodeList(); /* @@ -368,6 +368,12 @@ ResetVisitedFields(HTAB *adjacencyList) static bool AssociateDistributedTransactionWithBackendProc(TransactionNode *transactionNode) { +#ifdef USE_ASSERT_CHECKING + + /* if assertions are disabled, it would give unused variable warning */ + int32 localGroupId = GetLocalGroupId(); +#endif + for (int backendIndex = 0; backendIndex < MaxBackends; ++backendIndex) { PGPROC *currentProc = &ProcGlobal->allProcs[backendIndex]; @@ -403,7 +409,7 @@ AssociateDistributedTransactionWithBackendProc(TransactionNode *transactionNode) } /* at the point we should only have transactions initiated by this node */ - Assert(currentTransactionId->initiatorNodeIdentifier == GetLocalGroupId()); + Assert(currentTransactionId->initiatorNodeIdentifier == localGroupId); transactionNode->initiatorProc = currentProc; diff --git a/src/backend/distributed/transaction/lock_graph.c b/src/backend/distributed/transaction/lock_graph.c index 417becd10..aed021ae0 100644 --- a/src/backend/distributed/transaction/lock_graph.c +++ b/src/backend/distributed/transaction/lock_graph.c @@ -93,7 +93,7 @@ BuildGlobalWaitGraph(void) List *workerNodeList = ActiveReadableNodeList(); char *nodeUser = CitusExtensionOwnerName(); List *connectionList = NIL; - int localNodeId = GetLocalGroupId(); + int32 localGroupId = GetLocalGroupId(); WaitGraph *waitGraph = BuildLocalWaitGraph(); @@ -105,7 +105,7 @@ BuildGlobalWaitGraph(void) int nodePort = workerNode->workerPort; int connectionFlags = 0; - if (workerNode->groupId == localNodeId) + if (workerNode->groupId == localGroupId) { /* we already have local wait edges */ continue; diff --git a/src/backend/distributed/transaction/remote_transaction.c b/src/backend/distributed/transaction/remote_transaction.c index 0a2fd9d6b..2ffe1c45c 100644 --- a/src/backend/distributed/transaction/remote_transaction.c +++ b/src/backend/distributed/transaction/remote_transaction.c @@ -1356,7 +1356,7 @@ ParsePreparedTransactionName(char *preparedTransactionName, *groupId = strtol(currentCharPointer, NULL, 10); - if ((*groupId == 0 && errno == EINVAL) || + if ((*groupId == COORDINATOR_GROUP_ID && errno == EINVAL) || (*groupId == INT_MAX && errno == ERANGE)) { return false; diff --git a/src/backend/distributed/transaction/transaction_recovery.c b/src/backend/distributed/transaction/transaction_recovery.c index 132db008f..a401b2cee 100644 --- a/src/backend/distributed/transaction/transaction_recovery.c +++ b/src/backend/distributed/transaction/transaction_recovery.c @@ -396,7 +396,7 @@ PendingWorkerTransactionList(MultiConnection *connection) StringInfo command = makeStringInfo(); bool raiseInterrupts = true; List *transactionNames = NIL; - int coordinatorId = GetLocalGroupId(); + int32 coordinatorId = GetLocalGroupId(); appendStringInfo(command, "SELECT gid FROM pg_prepared_xacts " "WHERE gid LIKE 'citus\\_%d\\_%%'", diff --git a/src/backend/distributed/transaction/worker_transaction.c b/src/backend/distributed/transaction/worker_transaction.c index f8be9cd4e..343c4c67d 100644 --- a/src/backend/distributed/transaction/worker_transaction.c +++ b/src/backend/distributed/transaction/worker_transaction.c @@ -201,6 +201,8 @@ TargetWorkerSetNodeList(TargetWorkerSet targetWorkerSet, LOCKMODE lockMode) List *workerNodeList = ActivePrimaryWorkerNodeList(lockMode); List *result = NIL; + int32 localGroupId = GetLocalGroupId(); + WorkerNode *workerNode = NULL; foreach_ptr(workerNode, workerNodeList) { @@ -208,8 +210,7 @@ TargetWorkerSetNodeList(TargetWorkerSet targetWorkerSet, LOCKMODE lockMode) { continue; } - if (targetWorkerSet == OTHER_WORKERS && - workerNode->groupId == GetLocalGroupId()) + if (targetWorkerSet == OTHER_WORKERS && workerNode->groupId == localGroupId) { continue; } diff --git a/src/backend/distributed/worker/worker_shard_visibility.c b/src/backend/distributed/worker/worker_shard_visibility.c index 58f818267..259d4a7e7 100644 --- a/src/backend/distributed/worker/worker_shard_visibility.c +++ b/src/backend/distributed/worker/worker_shard_visibility.c @@ -13,6 +13,7 @@ #include "catalog/namespace.h" #include "catalog/pg_class.h" #include "distributed/metadata_cache.h" +#include "distributed/master_protocol.h" #include "distributed/worker_protocol.h" #include "distributed/worker_shard_visibility.h" #include "nodes/nodeFuncs.h" @@ -116,8 +117,7 @@ RelationIsAKnownShard(Oid shardRelationId, bool onlySearchPath) return false; } - int localGroupId = GetLocalGroupId(); - if (localGroupId == 0) + if (IsCoordinator()) { bool coordinatorIsKnown = false; PrimaryNodeForGroup(0, &coordinatorIsKnown);