Refactor ColumnAppearsInForeignKeyToReferenceTable (#4441)

pull/4445/head
Onur Tirtir 2020-12-23 11:44:02 +03:00 committed by GitHub
parent 90d63cb792
commit d1b3eaf767
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 66 additions and 17 deletions

View File

@ -64,6 +64,8 @@ static void ForeignConstraintFindDistKeys(HeapTuple pgConstraintTuple,
Var *referencedDistColumn,
int *referencingAttrIndex,
int *referencedAttrIndex);
static List * GetForeignKeyIdsForColumn(char *columnName, Oid relationId,
int searchForeignKeyColumnFlags);
static List * GetForeignConstraintCommandsInternal(Oid relationId, int flags);
static Oid get_relation_constraint_oid_compat(HeapTuple heapTuple);
static List * GetForeignKeyOidsToCitusLocalTables(Oid relationId);
@ -490,9 +492,45 @@ ForeignConstraintFindDistKeys(HeapTuple pgConstraintTuple,
bool
ColumnAppearsInForeignKeyToReferenceTable(char *columnName, Oid relationId)
{
int searchForeignKeyColumnFlags = SEARCH_REFERENCING_RELATION |
SEARCH_REFERENCED_RELATION;
List *foreignKeyIdsColumnAppeared =
GetForeignKeyIdsForColumn(columnName, relationId, searchForeignKeyColumnFlags);
Oid foreignKeyId = InvalidOid;
foreach_oid(foreignKeyId, foreignKeyIdsColumnAppeared)
{
Oid referencedTableId = GetReferencedTableId(foreignKeyId);
if (IsCitusTableType(referencedTableId, REFERENCE_TABLE))
{
return true;
}
}
return false;
}
/*
* GetForeignKeyIdsForColumn takes columnName and relationId for the owning
* relation, and returns a list of OIDs for foreign constraints that the column
* with columnName is involved according to "searchForeignKeyColumnFlags" argument.
* See SearchForeignKeyColumnFlags enum definition for usage.
*/
static List *
GetForeignKeyIdsForColumn(char *columnName, Oid relationId,
int searchForeignKeyColumnFlags)
{
bool searchReferencing = searchForeignKeyColumnFlags & SEARCH_REFERENCING_RELATION;
bool searchReferenced = searchForeignKeyColumnFlags & SEARCH_REFERENCED_RELATION;
/* at least one of them should be true */
Assert(searchReferencing || searchReferenced);
List *foreignKeyIdsColumnAppeared = NIL;
ScanKeyData scanKey[1];
int scanKeyCount = 1;
bool foreignKeyToReferenceTableIncludesGivenColumn = false;
Relation pgConstraint = table_open(ConstraintRelationId, AccessShareLock);
@ -511,11 +549,11 @@ ColumnAppearsInForeignKeyToReferenceTable(char *columnName, Oid relationId)
Oid referencedTableId = constraintForm->confrelid;
Oid referencingTableId = constraintForm->conrelid;
if (referencedTableId == relationId)
if (referencedTableId == relationId && searchReferenced)
{
pgConstraintKey = Anum_pg_constraint_confkey;
}
else if (referencingTableId == relationId)
else if (referencingTableId == relationId && searchReferencing)
{
pgConstraintKey = Anum_pg_constraint_conkey;
}
@ -529,22 +567,12 @@ ColumnAppearsInForeignKeyToReferenceTable(char *columnName, Oid relationId)
continue;
}
/*
* We check if the referenced table is a reference table. There cannot be
* any foreign constraint from a distributed table to a local table.
*/
Assert(IsCitusTable(referencedTableId));
if (!IsCitusTableType(referencedTableId, REFERENCE_TABLE))
{
heapTuple = systable_getnext(scanDescriptor);
continue;
}
if (HeapTupleOfForeignConstraintIncludesColumn(heapTuple, relationId,
pgConstraintKey, columnName))
{
foreignKeyToReferenceTableIncludesGivenColumn = true;
break;
Oid foreignKeyOid = get_relation_constraint_oid_compat(heapTuple);
foreignKeyIdsColumnAppeared = lappend_oid(foreignKeyIdsColumnAppeared,
foreignKeyOid);
}
heapTuple = systable_getnext(scanDescriptor);
@ -554,7 +582,7 @@ ColumnAppearsInForeignKeyToReferenceTable(char *columnName, Oid relationId)
systable_endscan(scanDescriptor);
table_close(pgConstraint, NoLock);
return foreignKeyToReferenceTableIncludesGivenColumn;
return foreignKeyIdsColumnAppeared;
}

View File

@ -64,6 +64,26 @@ typedef enum ExtractForeignKeyConstraintsMode
EXCLUDE_SELF_REFERENCES = 1 << 2
} ExtractForeignKeyConstraintMode;
/*
* Flags that can be passed to GetForeignKeyIdsForColumn to
* indicate whether relationId argument should match:
* - referencing relation or,
* - referenced relation,
* or we are searching for both sides.
*/
typedef enum SearchForeignKeyColumnFlags
{
/* relationId argument should match referencing relation */
SEARCH_REFERENCING_RELATION = 1 << 0,
/* relationId argument should match referenced relation */
SEARCH_REFERENCED_RELATION = 1 << 1,
/* callers can also pass union of above flags */
} SearchForeignKeyColumnFlags;
/* cluster.c - forward declarations */
extern List * PreprocessClusterStmt(Node *node, const char *clusterCommand);
@ -122,6 +142,7 @@ extern void ErrorIfUnsupportedForeignConstraintExists(Relation relation,
Var *distributionColumn,
uint32 colocationId);
extern void ErrorOutForFKeyBetweenPostgresAndCitusLocalTable(Oid localTableId);
extern bool ColumnReferencedByAnyForeignKey(char *columnName, Oid relationId);
extern bool ColumnAppearsInForeignKeyToReferenceTable(char *columnName, Oid
relationId);
extern List * GetReferencingForeignConstaintCommands(Oid relationOid);