Merge pull request #3563 from citusdata/refactor/local-group-id-and-fkey

Refactor around foreign key constraints and GetLocalGroupId
pull/3582/head
Onur Tirtir 2020-03-05 20:32:10 +03:00 committed by GitHub
commit c5007bc93c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 143 additions and 96 deletions

View File

@ -31,7 +31,7 @@
#include "utils/ruleutils.h" #include "utils/ruleutils.h"
#include "utils/syscache.h" #include "utils/syscache.h"
/* Local functions forward declarations */
static bool HeapTupleOfForeignConstraintIncludesColumn(HeapTuple heapTuple, Oid static bool HeapTupleOfForeignConstraintIncludesColumn(HeapTuple heapTuple, Oid
relationId, int pgConstraintKey, relationId, int pgConstraintKey,
char *columnName); char *columnName);
@ -96,22 +96,24 @@ ConstraintIsAForeignKeyToReferenceTable(char *constraintName, Oid relationId)
/* /*
* ErrorIfUnsupportedForeignConstraintExists runs checks related to foreign constraints and * ErrorIfUnsupportedForeignConstraintExists runs checks related to foreign
* errors out if it is not possible to create one of the foreign constraint in distributed * constraints and errors out if it is not possible to create one of the
* environment. * foreign constraint in distributed environment.
* *
* To support foreign constraints, we require that; * To support foreign constraints, we require that;
* - If referencing and referenced tables are hash-distributed * - If referencing and referenced tables are hash-distributed
* - Referencing and referenced tables are co-located. * - Referencing and referenced tables are co-located.
* - Foreign constraint is defined over distribution column. * - Foreign constraint is defined over distribution column.
* - ON DELETE/UPDATE SET NULL, ON DELETE/UPDATE SET DEFAULT and ON UPDATE CASCADE options * - ON DELETE/UPDATE SET NULL, ON DELETE/UPDATE SET DEFAULT and
* ON UPDATE CASCADE options
* are not used. * are not used.
* - Replication factors of referencing and referenced table are 1. * - Replication factors of referencing and referenced table are 1.
* - If referenced table is a reference table * - If referenced table is a reference table
* - ON DELETE/UPDATE SET NULL, ON DELETE/UPDATE SET DEFAULT and ON UPDATE CASCADE options * - ON DELETE/UPDATE SET NULL, ON DELETE/UPDATE SET DEFAULT and
* are not used on the distribution key of the referencing column. * ON UPDATE CASCADE options are not used on the distribution key
* - If referencing table is a reference table, error out if the referenced table is not a * of the referencing column.
* a reference table. * - If referencing table is a reference table, error out if the referenced
* table is not a reference table.
*/ */
void void
ErrorIfUnsupportedForeignConstraintExists(Relation relation, char referencingDistMethod, ErrorIfUnsupportedForeignConstraintExists(Relation relation, char referencingDistMethod,
@ -122,12 +124,10 @@ ErrorIfUnsupportedForeignConstraintExists(Relation relation, char referencingDis
int scanKeyCount = 1; int scanKeyCount = 1;
Oid referencingTableId = relation->rd_id; Oid referencingTableId = relation->rd_id;
Oid referencedTableId = InvalidOid;
uint32 referencedColocationId = INVALID_COLOCATION_ID;
bool selfReferencingTable = false;
bool referencingNotReplicated = true; bool referencingNotReplicated = true;
bool referencingIsDistributed = IsDistributedTable(referencingTableId);
if (IsDistributedTable(referencingTableId)) if (referencingIsDistributed)
{ {
/* ALTER TABLE command is applied over single replicated table */ /* ALTER TABLE command is applied over single replicated table */
referencingNotReplicated = SingleReplicatedTable(referencingTableId); referencingNotReplicated = SingleReplicatedTable(referencingTableId);
@ -150,21 +150,26 @@ ErrorIfUnsupportedForeignConstraintExists(Relation relation, char referencingDis
while (HeapTupleIsValid(heapTuple)) while (HeapTupleIsValid(heapTuple))
{ {
Form_pg_constraint constraintForm = (Form_pg_constraint) GETSTRUCT(heapTuple); Form_pg_constraint constraintForm = (Form_pg_constraint) GETSTRUCT(heapTuple);
int referencingAttrIndex = -1;
char referencedDistMethod = 0; char referencedDistMethod = 0;
Var *referencedDistKey = NULL; Var *referencedDistKey = NULL;
int referencingAttrIndex = -1;
int referencedAttrIndex = -1; int referencedAttrIndex = -1;
uint32 referencedColocationId = INVALID_COLOCATION_ID;
/* not a foreign key constraint, skip to next one */
if (constraintForm->contype != CONSTRAINT_FOREIGN) if (constraintForm->contype != CONSTRAINT_FOREIGN)
{ {
heapTuple = systable_getnext(scanDescriptor); heapTuple = systable_getnext(scanDescriptor);
continue; continue;
} }
referencedTableId = constraintForm->confrelid; Oid referencedTableId = constraintForm->confrelid;
selfReferencingTable = (referencingTableId == referencedTableId);
bool referencedIsDistributed = IsDistributedTable(referencedTableId); bool referencedIsDistributed = IsDistributedTable(referencedTableId);
bool selfReferencingTable = (referencingTableId == referencedTableId);
if (!referencedIsDistributed && !selfReferencingTable) if (!referencedIsDistributed && !selfReferencingTable)
{ {
ereport(ERROR, (errcode(ERRCODE_INVALID_TABLE_DEFINITION), ereport(ERROR, (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
@ -173,6 +178,8 @@ ErrorIfUnsupportedForeignConstraintExists(Relation relation, char referencingDis
" or a reference table."))); " or a reference table.")));
} }
/* set referenced table related variables here if table is referencing itself */
if (!selfReferencingTable) if (!selfReferencingTable)
{ {
referencedDistMethod = PartitionMethod(referencedTableId); referencedDistMethod = PartitionMethod(referencedTableId);
@ -191,7 +198,6 @@ ErrorIfUnsupportedForeignConstraintExists(Relation relation, char referencingDis
bool referencingIsReferenceTable = (referencingDistMethod == DISTRIBUTE_BY_NONE); bool referencingIsReferenceTable = (referencingDistMethod == DISTRIBUTE_BY_NONE);
bool referencedIsReferenceTable = (referencedDistMethod == DISTRIBUTE_BY_NONE); bool referencedIsReferenceTable = (referencedDistMethod == DISTRIBUTE_BY_NONE);
/* /*
* We support foreign keys between reference tables. No more checks * We support foreign keys between reference tables. No more checks
* are necessary. * are necessary.
@ -461,6 +467,7 @@ ColumnAppearsInForeignKeyToReferenceTable(char *columnName, Oid relationId)
/* clean up scan and close system catalog */ /* clean up scan and close system catalog */
systable_endscan(scanDescriptor); systable_endscan(scanDescriptor);
heap_close(pgConstraint, AccessShareLock); heap_close(pgConstraint, AccessShareLock);
return foreignKeyToReferenceTableIncludesGivenColumn; return foreignKeyToReferenceTableIncludesGivenColumn;
} }
@ -562,6 +569,7 @@ HasForeignKeyToReferenceTable(Oid relationId)
if (!IsDistributedTable(referencedTableId)) if (!IsDistributedTable(referencedTableId))
{ {
heapTuple = systable_getnext(scanDescriptor);
continue; continue;
} }
@ -605,6 +613,7 @@ TableReferenced(Oid relationId)
scanKeyCount, scanKey); scanKeyCount, scanKey);
HeapTuple heapTuple = systable_getnext(scanDescriptor); HeapTuple heapTuple = systable_getnext(scanDescriptor);
while (HeapTupleIsValid(heapTuple)) while (HeapTupleIsValid(heapTuple))
{ {
Form_pg_constraint constraintForm = (Form_pg_constraint) GETSTRUCT(heapTuple); Form_pg_constraint constraintForm = (Form_pg_constraint) GETSTRUCT(heapTuple);
@ -620,6 +629,8 @@ TableReferenced(Oid relationId)
heapTuple = systable_getnext(scanDescriptor); heapTuple = systable_getnext(scanDescriptor);
} }
/* clean up scan and close system catalog */
systable_endscan(scanDescriptor); systable_endscan(scanDescriptor);
heap_close(pgConstraint, NoLock); heap_close(pgConstraint, NoLock);

View File

@ -122,9 +122,9 @@ PreprocessDropTableStmt(Node *node, const char *queryString)
/* /*
* PostprocessCreateTableStmtPartitionOf takes CreateStmt object as a parameter but * PostprocessCreateTableStmtPartitionOf takes CreateStmt object as a parameter
* it only processes CREATE TABLE ... PARTITION OF statements and it checks if * but it only processes CREATE TABLE ... PARTITION OF statements and it checks
* user creates the table as a partition of a distributed table. In that case, * if user creates the table as a partition of a distributed table. In that case,
* it distributes partition as well. Since the table itself is a partition, * it distributes partition as well. Since the table itself is a partition,
* CreateDistributedTable will attach it to its parent table automatically after * CreateDistributedTable will attach it to its parent table automatically after
* distributing it. * distributing it.
@ -173,9 +173,10 @@ PostprocessCreateTableStmtPartitionOf(CreateStmt *createStatement, const
/* /*
* PostprocessAlterTableStmtAttachPartition takes AlterTableStmt object as parameter * PostprocessAlterTableStmtAttachPartition takes AlterTableStmt object as
* but it only processes into ALTER TABLE ... ATTACH PARTITION commands and * parameter but it only processes into ALTER TABLE ... ATTACH PARTITION
* distributes the partition if necessary. There are four cases to consider; * commands and distributes the partition if necessary. There are four cases
* to consider;
* *
* Parent is not distributed, partition is not distributed: We do not need to * Parent is not distributed, partition is not distributed: We do not need to
* do anything in this case. * do anything in this case.
@ -250,9 +251,9 @@ PostprocessAlterTableStmtAttachPartition(AlterTableStmt *alterTableStatement,
/* /*
* PostprocessAlterTableSchemaStmt is executed after the change has been applied locally, we * PostprocessAlterTableSchemaStmt is executed after the change has been applied
* can now use the new dependencies of the table to ensure all its dependencies exist on * locally, we can now use the new dependencies of the table to ensure all its
* the workers before we apply the commands remotely. * dependencies exist on the workers before we apply the commands remotely.
*/ */
List * List *
PostprocessAlterTableSchemaStmt(Node *node, const char *queryString) PostprocessAlterTableSchemaStmt(Node *node, const char *queryString)
@ -274,19 +275,18 @@ PostprocessAlterTableSchemaStmt(Node *node, const char *queryString)
/* /*
* PreprocessAlterTableStmt determines whether a given ALTER TABLE statement involves * PreprocessAlterTableStmt determines whether a given ALTER TABLE statement
* a distributed table. If so (and if the statement does not use unsupported * involves a distributed table. If so (and if the statement does not use
* options), it modifies the input statement to ensure proper execution against * unsupported options), it modifies the input statement to ensure proper
* the master node table and creates a DDLJob to encapsulate information needed * execution against the master node table and creates a DDLJob to encapsulate
* during the worker node portion of DDL execution before returning that DDLJob * information needed during the worker node portion of DDL execution before
* in a List. If no distributed table is involved, this function returns NIL. * returning that DDLJob in a List. If no distributed table is involved, this
* function returns NIL.
*/ */
List * List *
PreprocessAlterTableStmt(Node *node, const char *alterTableCommand) PreprocessAlterTableStmt(Node *node, const char *alterTableCommand)
{ {
AlterTableStmt *alterTableStatement = castNode(AlterTableStmt, node); AlterTableStmt *alterTableStatement = castNode(AlterTableStmt, node);
Oid rightRelationId = InvalidOid;
bool executeSequentially = false;
/* first check whether a distributed relation is affected */ /* first check whether a distributed relation is affected */
if (alterTableStatement->relation == NULL) if (alterTableStatement->relation == NULL)
@ -296,6 +296,7 @@ PreprocessAlterTableStmt(Node *node, const char *alterTableCommand)
LOCKMODE lockmode = AlterTableGetLockLevel(alterTableStatement->cmds); LOCKMODE lockmode = AlterTableGetLockLevel(alterTableStatement->cmds);
Oid leftRelationId = AlterTableLookupRelation(alterTableStatement, lockmode); Oid leftRelationId = AlterTableLookupRelation(alterTableStatement, lockmode);
if (!OidIsValid(leftRelationId)) if (!OidIsValid(leftRelationId))
{ {
return NIL; return NIL;
@ -309,11 +310,12 @@ PreprocessAlterTableStmt(Node *node, const char *alterTableCommand)
char leftRelationKind = get_rel_relkind(leftRelationId); char leftRelationKind = get_rel_relkind(leftRelationId);
if (leftRelationKind == RELKIND_INDEX) if (leftRelationKind == RELKIND_INDEX)
{ {
leftRelationId = IndexGetRelation(leftRelationId, false); bool missingOk = false;
leftRelationId = IndexGetRelation(leftRelationId, missingOk);
} }
bool isDistributedRelation = IsDistributedTable(leftRelationId); bool referencingIsLocalTable = !IsDistributedTable(leftRelationId);
if (!isDistributedRelation) if (referencingIsLocalTable)
{ {
return NIL; return NIL;
} }
@ -334,13 +336,19 @@ PreprocessAlterTableStmt(Node *node, const char *alterTableCommand)
ErrorIfUnsupportedAlterTableStmt(alterTableStatement); ErrorIfUnsupportedAlterTableStmt(alterTableStatement);
} }
/* these will be set in below loop according to subcommands */
Oid rightRelationId = InvalidOid;
bool executeSequentially = false;
/* /*
* We check if there is a ADD/DROP FOREIGN CONSTRAINT command in sub commands list. * We check if there is a ADD/DROP FOREIGN CONSTRAINT command in sub commands
* If there is we assign referenced relation id to rightRelationId and we also * list. If there is we assign referenced relation id to rightRelationId and
* set skip_validation to true to prevent PostgreSQL to verify validity of the * we also set skip_validation to true to prevent PostgreSQL to verify validity
* foreign constraint in master. Validity will be checked in workers anyway. * of the foreign constraint in master. Validity will be checked in workers
* anyway.
*/ */
List *commandList = alterTableStatement->cmds; List *commandList = alterTableStatement->cmds;
AlterTableCmd *command = NULL; AlterTableCmd *command = NULL;
foreach_ptr(command, commandList) foreach_ptr(command, commandList)
{ {
@ -438,6 +446,10 @@ PreprocessAlterTableStmt(Node *node, const char *alterTableCommand)
rightRelationId = RangeVarGetRelid(partitionCommand->name, NoLock, false); rightRelationId = RangeVarGetRelid(partitionCommand->name, NoLock, false);
} }
/*
* We check and set the execution mode only if we fall into either of first two
* conditional blocks, otherwise we already continue the loop
*/
executeSequentially |= SetupExecutionModeForAlterTable(leftRelationId, executeSequentially |= SetupExecutionModeForAlterTable(leftRelationId,
command); command);
} }
@ -447,14 +459,16 @@ PreprocessAlterTableStmt(Node *node, const char *alterTableCommand)
SetLocalMultiShardModifyModeToSequential(); SetLocalMultiShardModifyModeToSequential();
} }
/* fill them here as it is possible to use them in some condtional blocks below */
DDLJob *ddlJob = palloc0(sizeof(DDLJob)); DDLJob *ddlJob = palloc0(sizeof(DDLJob));
ddlJob->targetRelationId = leftRelationId; ddlJob->targetRelationId = leftRelationId;
ddlJob->concurrentIndexCmd = false; ddlJob->concurrentIndexCmd = false;
ddlJob->commandString = alterTableCommand; ddlJob->commandString = alterTableCommand;
if (rightRelationId) if (OidIsValid(rightRelationId))
{ {
if (!IsDistributedTable(rightRelationId)) bool referencedIsLocalTable = !IsDistributedTable(rightRelationId);
if (referencedIsLocalTable)
{ {
ddlJob->taskList = NIL; ddlJob->taskList = NIL;
} }
@ -496,10 +510,11 @@ PreprocessAlterTableMoveAllStmt(Node *node, const char *queryString)
/* /*
* PreprocessAlterTableSchemaStmt is executed before the statement is applied to the local * PreprocessAlterTableSchemaStmt is executed before the statement is applied
* postgres instance. * to the local postgres instance.
* *
* In this stage we can prepare the commands that will alter the schemas of the shards. * In this stage we can prepare the commands that will alter the schemas of the
* shards.
*/ */
List * List *
PreprocessAlterTableSchemaStmt(Node *node, const char *queryString) PreprocessAlterTableSchemaStmt(Node *node, const char *queryString)
@ -664,11 +679,11 @@ ErrorIfAlterDropsPartitionColumn(AlterTableStmt *alterTableStatement)
/* /*
* PostprocessAlterTableStmt runs after the ALTER TABLE command has already run on the * PostprocessAlterTableStmt runs after the ALTER TABLE command has already run
* master, so we are checking constraints over the table with constraints already defined * on the master, so we are checking constraints over the table with constraints
* (to make the constraint check process same for ALTER TABLE and CREATE TABLE). If * already defined (to make the constraint check process same for ALTER TABLE and
* constraints do not fulfill the rules we defined, they will be removed and the table * CREATE TABLE). If constraints do not fulfill the rules we defined, they will be
* will return back to the state before the ALTER TABLE command. * removed and the table will return back to the state before the ALTER TABLE command.
*/ */
void void
PostprocessAlterTableStmt(AlterTableStmt *alterTableStatement) PostprocessAlterTableStmt(AlterTableStmt *alterTableStatement)
@ -845,11 +860,11 @@ ErrorIfUnsupportedConstraint(Relation relation, char distributionMethod,
Var *distributionColumn, uint32 colocationId) Var *distributionColumn, uint32 colocationId)
{ {
/* /*
* We first perform check for foreign constraints. It is important to do this check * We first perform check for foreign constraints. It is important to do this
* before next check, because other types of constraints are allowed on reference * check before next check, because other types of constraints are allowed on
* tables and we return early for those constraints thanks to next check. Therefore, * reference tables and we return early for those constraints thanks to next
* for reference tables, we first check for foreing constraints and if they are OK, * check. Therefore, for reference tables, we first check for foreign constraints
* we do not error out for other types of constraints. * and if they are OK, we do not error out for other types of constraints.
*/ */
ErrorIfUnsupportedForeignConstraintExists(relation, distributionMethod, ErrorIfUnsupportedForeignConstraintExists(relation, distributionMethod,
distributionColumn, distributionColumn,
@ -929,7 +944,8 @@ ErrorIfUnsupportedConstraint(Relation relation, char distributionMethod,
if (!hasDistributionColumn) if (!hasDistributionColumn)
{ {
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), ereport(ERROR, (
errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot create constraint on \"%s\"", errmsg("cannot create constraint on \"%s\"",
relationName), relationName),
errdetail("Distributed relations cannot have UNIQUE, " errdetail("Distributed relations cannot have UNIQUE, "
@ -1434,13 +1450,13 @@ ErrorIfUnsupportedAlterAddConstraintStmt(AlterTableStmt *alterTableStatement)
/* /*
* AlterTableSchemaStmtObjectAddress returns the ObjectAddress of the table that is the * AlterTableSchemaStmtObjectAddress returns the ObjectAddress of the table that
* object of the AlterObjectSchemaStmt. * is the object of the AlterObjectSchemaStmt.
* *
* This could be called both before or after it has been applied locally. It will look in * This could be called both before or after it has been applied locally. It will
* the old schema first, if the table cannot be found in that schema it will look in the * look in the old schema first, if the table cannot be found in that schema it
* new schema. Errors if missing_ok is false and the table cannot be found in either of the * will look in the new schema. Errors if missing_ok is false and the table cannot
* schemas. * be found in either of the schemas.
*/ */
ObjectAddress ObjectAddress
AlterTableSchemaStmtObjectAddress(Node *node, bool missing_ok) AlterTableSchemaStmtObjectAddress(Node *node, bool missing_ok)
@ -1469,10 +1485,12 @@ AlterTableSchemaStmtObjectAddress(Node *node, bool missing_ok)
if (!missing_ok && tableOid == InvalidOid) if (!missing_ok && tableOid == InvalidOid)
{ {
const char *quotedTableName =
quote_qualified_identifier(stmt->relation->schemaname, tableName);
ereport(ERROR, (errcode(ERRCODE_UNDEFINED_TABLE), ereport(ERROR, (errcode(ERRCODE_UNDEFINED_TABLE),
errmsg("relation \"%s\" does not exist", errmsg("relation \"%s\" does not exist",
quote_qualified_identifier(stmt->relation->schemaname, quotedTableName)));
tableName))));
} }
} }

View File

@ -234,6 +234,8 @@ AcquireDistributedLockOnRelations(List *relationIdList, LOCKMODE lockMode)
UseCoordinatedTransaction(); UseCoordinatedTransaction();
int32 localGroupId = GetLocalGroupId();
foreach_oid(relationId, relationIdList) foreach_oid(relationId, relationIdList)
{ {
/* /*
@ -256,7 +258,7 @@ AcquireDistributedLockOnRelations(List *relationIdList, LOCKMODE lockMode)
int nodePort = workerNode->workerPort; int nodePort = workerNode->workerPort;
/* if local node is one of the targets, acquire the lock locally */ /* if local node is one of the targets, acquire the lock locally */
if (workerNode->groupId == GetLocalGroupId()) if (workerNode->groupId == localGroupId)
{ {
LockRelationOid(relationId, lockMode); LockRelationOid(relationId, lockMode);
continue; continue;

View File

@ -1609,6 +1609,8 @@ AssignTasksToConnectionsOrWorkerPool(DistributedExecution *execution)
List *taskList = execution->tasksToExecute; List *taskList = execution->tasksToExecute;
bool hasReturning = execution->hasReturning; bool hasReturning = execution->hasReturning;
int32 localGroupId = GetLocalGroupId();
Task *task = NULL; Task *task = NULL;
foreach_ptr(task, taskList) foreach_ptr(task, taskList)
{ {
@ -1752,7 +1754,7 @@ AssignTasksToConnectionsOrWorkerPool(DistributedExecution *execution)
} }
if (!TransactionConnectedToLocalGroup && taskPlacement->groupId == if (!TransactionConnectedToLocalGroup && taskPlacement->groupId ==
GetLocalGroupId()) localGroupId)
{ {
TransactionConnectedToLocalGroup = true; TransactionConnectedToLocalGroup = true;
} }

View File

@ -532,10 +532,13 @@ GetCachedLocalPlan(Task *task, DistributedPlan *distributedPlan)
{ {
List *cachedPlanList = distributedPlan->workerJob->localPlannedStatements; List *cachedPlanList = distributedPlan->workerJob->localPlannedStatements;
LocalPlannedStatement *localPlannedStatement = NULL; LocalPlannedStatement *localPlannedStatement = NULL;
int32 localGroupId = GetLocalGroupId();
foreach_ptr(localPlannedStatement, cachedPlanList) foreach_ptr(localPlannedStatement, cachedPlanList)
{ {
if (localPlannedStatement->shardId == task->anchorShardId && if (localPlannedStatement->shardId == task->anchorShardId &&
localPlannedStatement->localGroupId == GetLocalGroupId()) localPlannedStatement->localGroupId == localGroupId)
{ {
/* already have a cached plan, no need to continue */ /* already have a cached plan, no need to continue */
return localPlannedStatement->localPlan; return localPlannedStatement->localPlan;

View File

@ -504,7 +504,7 @@ ShouldExecuteTasksLocally(List *taskList)
bool bool
TaskAccessesLocalNode(Task *task) TaskAccessesLocalNode(Task *task)
{ {
int localGroupId = GetLocalGroupId(); int32 localGroupId = GetLocalGroupId();
ShardPlacement *taskPlacement = NULL; ShardPlacement *taskPlacement = NULL;
foreach_ptr(taskPlacement, task->taskPlacementList) foreach_ptr(taskPlacement, task->taskPlacementList)

View File

@ -772,7 +772,7 @@ ShardStorageType(Oid relationId)
bool bool
IsCoordinator(void) IsCoordinator(void)
{ {
return (GetLocalGroupId() == 0); return (GetLocalGroupId() == COORDINATOR_GROUP_ID);
} }

View File

@ -199,7 +199,7 @@ ClusterHasKnownMetadataWorkers()
{ {
bool workerWithMetadata = false; bool workerWithMetadata = false;
if (GetLocalGroupId() != 0) if (!IsCoordinator())
{ {
workerWithMetadata = true; workerWithMetadata = true;
} }

View File

@ -1334,7 +1334,7 @@ GetNextNodeId()
void void
EnsureCoordinator(void) EnsureCoordinator(void)
{ {
int localGroupId = GetLocalGroupId(); int32 localGroupId = GetLocalGroupId();
if (localGroupId != 0) if (localGroupId != 0)
{ {

View File

@ -123,8 +123,8 @@ TryToDelegateFunctionCall(DistributedPlanningContext *planContext)
return NULL; return NULL;
} }
int32 groupId = GetLocalGroupId(); int32 localGroupId = GetLocalGroupId();
if (groupId != 0 || groupId == GROUP_ID_UPGRADING) if (localGroupId != COORDINATOR_GROUP_ID || localGroupId == GROUP_ID_UPGRADING)
{ {
/* do not delegate from workers, or while upgrading */ /* do not delegate from workers, or while upgrading */
return NULL; return NULL;

View File

@ -226,6 +226,8 @@ get_global_active_transactions(PG_FUNCTION_ARGS)
/* add active transactions for local node */ /* add active transactions for local node */
StoreAllActiveTransactions(tupleStore, tupleDescriptor); StoreAllActiveTransactions(tupleStore, tupleDescriptor);
int32 localGroupId = GetLocalGroupId();
/* open connections in parallel */ /* open connections in parallel */
WorkerNode *workerNode = NULL; WorkerNode *workerNode = NULL;
foreach_ptr(workerNode, workerNodeList) foreach_ptr(workerNode, workerNodeList)
@ -234,7 +236,7 @@ get_global_active_transactions(PG_FUNCTION_ARGS)
int nodePort = workerNode->workerPort; int nodePort = workerNode->workerPort;
int connectionFlags = 0; int connectionFlags = 0;
if (workerNode->groupId == GetLocalGroupId()) if (workerNode->groupId == localGroupId)
{ {
/* we already get these transactions via GetAllActiveTransactions() */ /* we already get these transactions via GetAllActiveTransactions() */
continue; continue;
@ -726,7 +728,7 @@ AssignDistributedTransactionId(void)
&backendManagementShmemData->nextTransactionNumber; &backendManagementShmemData->nextTransactionNumber;
uint64 nextTransactionNumber = pg_atomic_fetch_add_u64(transactionNumberSequence, 1); uint64 nextTransactionNumber = pg_atomic_fetch_add_u64(transactionNumberSequence, 1);
int localGroupId = GetLocalGroupId(); int32 localGroupId = GetLocalGroupId();
TimestampTz currentTimestamp = GetCurrentTimestamp(); TimestampTz currentTimestamp = GetCurrentTimestamp();
Oid userId = GetUserId(); Oid userId = GetUserId();
@ -758,7 +760,7 @@ MarkCitusInitiatedCoordinatorBackend(void)
* GetLocalGroupId may throw exception which can cause leaving spin lock * GetLocalGroupId may throw exception which can cause leaving spin lock
* unreleased. Calling GetLocalGroupId function before the lock to avoid this. * unreleased. Calling GetLocalGroupId function before the lock to avoid this.
*/ */
int localGroupId = GetLocalGroupId(); int32 localGroupId = GetLocalGroupId();
SpinLockAcquire(&MyBackendData->mutex); SpinLockAcquire(&MyBackendData->mutex);

View File

@ -329,6 +329,8 @@ CitusStatActivity(const char *statQuery)
*/ */
char *nodeUser = CurrentUserName(); char *nodeUser = CurrentUserName();
int32 localGroupId = GetLocalGroupId();
/* open connections in parallel */ /* open connections in parallel */
WorkerNode *workerNode = NULL; WorkerNode *workerNode = NULL;
foreach_ptr(workerNode, workerNodeList) foreach_ptr(workerNode, workerNodeList)
@ -337,7 +339,7 @@ CitusStatActivity(const char *statQuery)
int nodePort = workerNode->workerPort; int nodePort = workerNode->workerPort;
int connectionFlags = 0; int connectionFlags = 0;
if (workerNode->groupId == GetLocalGroupId()) if (workerNode->groupId == localGroupId)
{ {
/* we already get these stats via GetLocalNodeCitusDistStat() */ /* we already get these stats via GetLocalNodeCitusDistStat() */
continue; continue;
@ -432,7 +434,7 @@ GetLocalNodeCitusDistStat(const char *statQuery)
return citusStatsList; return citusStatsList;
} }
int localGroupId = GetLocalGroupId(); int32 localGroupId = GetLocalGroupId();
/* get the current worker's node stats */ /* get the current worker's node stats */
List *workerNodeList = ActivePrimaryWorkerNodeList(NoLock); List *workerNodeList = ActivePrimaryWorkerNodeList(NoLock);

View File

@ -105,7 +105,7 @@ CheckForDistributedDeadlocks(void)
{ {
HASH_SEQ_STATUS status; HASH_SEQ_STATUS status;
TransactionNode *transactionNode = NULL; TransactionNode *transactionNode = NULL;
int localGroupId = GetLocalGroupId(); int32 localGroupId = GetLocalGroupId();
List *workerNodeList = ActiveReadableNodeList(); List *workerNodeList = ActiveReadableNodeList();
/* /*
@ -368,6 +368,12 @@ ResetVisitedFields(HTAB *adjacencyList)
static bool static bool
AssociateDistributedTransactionWithBackendProc(TransactionNode *transactionNode) AssociateDistributedTransactionWithBackendProc(TransactionNode *transactionNode)
{ {
#ifdef USE_ASSERT_CHECKING
/* if assertions are disabled, it would give unused variable warning */
int32 localGroupId = GetLocalGroupId();
#endif
for (int backendIndex = 0; backendIndex < MaxBackends; ++backendIndex) for (int backendIndex = 0; backendIndex < MaxBackends; ++backendIndex)
{ {
PGPROC *currentProc = &ProcGlobal->allProcs[backendIndex]; PGPROC *currentProc = &ProcGlobal->allProcs[backendIndex];
@ -403,7 +409,7 @@ AssociateDistributedTransactionWithBackendProc(TransactionNode *transactionNode)
} }
/* at the point we should only have transactions initiated by this node */ /* at the point we should only have transactions initiated by this node */
Assert(currentTransactionId->initiatorNodeIdentifier == GetLocalGroupId()); Assert(currentTransactionId->initiatorNodeIdentifier == localGroupId);
transactionNode->initiatorProc = currentProc; transactionNode->initiatorProc = currentProc;

View File

@ -93,7 +93,7 @@ BuildGlobalWaitGraph(void)
List *workerNodeList = ActiveReadableNodeList(); List *workerNodeList = ActiveReadableNodeList();
char *nodeUser = CitusExtensionOwnerName(); char *nodeUser = CitusExtensionOwnerName();
List *connectionList = NIL; List *connectionList = NIL;
int localNodeId = GetLocalGroupId(); int32 localGroupId = GetLocalGroupId();
WaitGraph *waitGraph = BuildLocalWaitGraph(); WaitGraph *waitGraph = BuildLocalWaitGraph();
@ -105,7 +105,7 @@ BuildGlobalWaitGraph(void)
int nodePort = workerNode->workerPort; int nodePort = workerNode->workerPort;
int connectionFlags = 0; int connectionFlags = 0;
if (workerNode->groupId == localNodeId) if (workerNode->groupId == localGroupId)
{ {
/* we already have local wait edges */ /* we already have local wait edges */
continue; continue;

View File

@ -1356,7 +1356,7 @@ ParsePreparedTransactionName(char *preparedTransactionName,
*groupId = strtol(currentCharPointer, NULL, 10); *groupId = strtol(currentCharPointer, NULL, 10);
if ((*groupId == 0 && errno == EINVAL) || if ((*groupId == COORDINATOR_GROUP_ID && errno == EINVAL) ||
(*groupId == INT_MAX && errno == ERANGE)) (*groupId == INT_MAX && errno == ERANGE))
{ {
return false; return false;

View File

@ -396,7 +396,7 @@ PendingWorkerTransactionList(MultiConnection *connection)
StringInfo command = makeStringInfo(); StringInfo command = makeStringInfo();
bool raiseInterrupts = true; bool raiseInterrupts = true;
List *transactionNames = NIL; List *transactionNames = NIL;
int coordinatorId = GetLocalGroupId(); int32 coordinatorId = GetLocalGroupId();
appendStringInfo(command, "SELECT gid FROM pg_prepared_xacts " appendStringInfo(command, "SELECT gid FROM pg_prepared_xacts "
"WHERE gid LIKE 'citus\\_%d\\_%%'", "WHERE gid LIKE 'citus\\_%d\\_%%'",

View File

@ -201,6 +201,8 @@ TargetWorkerSetNodeList(TargetWorkerSet targetWorkerSet, LOCKMODE lockMode)
List *workerNodeList = ActivePrimaryWorkerNodeList(lockMode); List *workerNodeList = ActivePrimaryWorkerNodeList(lockMode);
List *result = NIL; List *result = NIL;
int32 localGroupId = GetLocalGroupId();
WorkerNode *workerNode = NULL; WorkerNode *workerNode = NULL;
foreach_ptr(workerNode, workerNodeList) foreach_ptr(workerNode, workerNodeList)
{ {
@ -208,8 +210,7 @@ TargetWorkerSetNodeList(TargetWorkerSet targetWorkerSet, LOCKMODE lockMode)
{ {
continue; continue;
} }
if (targetWorkerSet == OTHER_WORKERS && if (targetWorkerSet == OTHER_WORKERS && workerNode->groupId == localGroupId)
workerNode->groupId == GetLocalGroupId())
{ {
continue; continue;
} }

View File

@ -13,6 +13,7 @@
#include "catalog/namespace.h" #include "catalog/namespace.h"
#include "catalog/pg_class.h" #include "catalog/pg_class.h"
#include "distributed/metadata_cache.h" #include "distributed/metadata_cache.h"
#include "distributed/master_protocol.h"
#include "distributed/worker_protocol.h" #include "distributed/worker_protocol.h"
#include "distributed/worker_shard_visibility.h" #include "distributed/worker_shard_visibility.h"
#include "nodes/nodeFuncs.h" #include "nodes/nodeFuncs.h"
@ -116,8 +117,7 @@ RelationIsAKnownShard(Oid shardRelationId, bool onlySearchPath)
return false; return false;
} }
int localGroupId = GetLocalGroupId(); if (IsCoordinator())
if (localGroupId == 0)
{ {
bool coordinatorIsKnown = false; bool coordinatorIsKnown = false;
PrimaryNodeForGroup(0, &coordinatorIsKnown); PrimaryNodeForGroup(0, &coordinatorIsKnown);