Refactor ColumnAppearsInForeignKeyToReferenceTable (#4441)

(cherry picked from commit d1b3eaf767)
pull/4693/head
Onur Tirtir 2020-12-23 11:44:02 +03:00 committed by Ahmet Gedemenli
parent 7480160f4f
commit 26556b2bba
2 changed files with 66 additions and 17 deletions

View File

@ -64,6 +64,8 @@ static void ForeignConstraintFindDistKeys(HeapTuple pgConstraintTuple,
Var *referencedDistColumn, Var *referencedDistColumn,
int *referencingAttrIndex, int *referencingAttrIndex,
int *referencedAttrIndex); int *referencedAttrIndex);
static List * GetForeignKeyIdsForColumn(char *columnName, Oid relationId,
int searchForeignKeyColumnFlags);
static List * GetForeignConstraintCommandsInternal(Oid relationId, int flags); static List * GetForeignConstraintCommandsInternal(Oid relationId, int flags);
static Oid get_relation_constraint_oid_compat(HeapTuple heapTuple); static Oid get_relation_constraint_oid_compat(HeapTuple heapTuple);
static List * GetForeignKeyOidsToCitusLocalTables(Oid relationId); static List * GetForeignKeyOidsToCitusLocalTables(Oid relationId);
@ -490,9 +492,45 @@ ForeignConstraintFindDistKeys(HeapTuple pgConstraintTuple,
bool bool
ColumnAppearsInForeignKeyToReferenceTable(char *columnName, Oid relationId) ColumnAppearsInForeignKeyToReferenceTable(char *columnName, Oid relationId)
{ {
int searchForeignKeyColumnFlags = SEARCH_REFERENCING_RELATION |
SEARCH_REFERENCED_RELATION;
List *foreignKeyIdsColumnAppeared =
GetForeignKeyIdsForColumn(columnName, relationId, searchForeignKeyColumnFlags);
Oid foreignKeyId = InvalidOid;
foreach_oid(foreignKeyId, foreignKeyIdsColumnAppeared)
{
Oid referencedTableId = GetReferencedTableId(foreignKeyId);
if (IsCitusTableType(referencedTableId, REFERENCE_TABLE))
{
return true;
}
}
return false;
}
/*
* GetForeignKeyIdsForColumn takes columnName and relationId for the owning
* relation, and returns a list of OIDs for foreign constraints that the column
* with columnName is involved according to "searchForeignKeyColumnFlags" argument.
* See SearchForeignKeyColumnFlags enum definition for usage.
*/
static List *
GetForeignKeyIdsForColumn(char *columnName, Oid relationId,
int searchForeignKeyColumnFlags)
{
bool searchReferencing = searchForeignKeyColumnFlags & SEARCH_REFERENCING_RELATION;
bool searchReferenced = searchForeignKeyColumnFlags & SEARCH_REFERENCED_RELATION;
/* at least one of them should be true */
Assert(searchReferencing || searchReferenced);
List *foreignKeyIdsColumnAppeared = NIL;
ScanKeyData scanKey[1]; ScanKeyData scanKey[1];
int scanKeyCount = 1; int scanKeyCount = 1;
bool foreignKeyToReferenceTableIncludesGivenColumn = false;
Relation pgConstraint = table_open(ConstraintRelationId, AccessShareLock); Relation pgConstraint = table_open(ConstraintRelationId, AccessShareLock);
@ -511,11 +549,11 @@ ColumnAppearsInForeignKeyToReferenceTable(char *columnName, Oid relationId)
Oid referencedTableId = constraintForm->confrelid; Oid referencedTableId = constraintForm->confrelid;
Oid referencingTableId = constraintForm->conrelid; Oid referencingTableId = constraintForm->conrelid;
if (referencedTableId == relationId) if (referencedTableId == relationId && searchReferenced)
{ {
pgConstraintKey = Anum_pg_constraint_confkey; pgConstraintKey = Anum_pg_constraint_confkey;
} }
else if (referencingTableId == relationId) else if (referencingTableId == relationId && searchReferencing)
{ {
pgConstraintKey = Anum_pg_constraint_conkey; pgConstraintKey = Anum_pg_constraint_conkey;
} }
@ -529,22 +567,12 @@ ColumnAppearsInForeignKeyToReferenceTable(char *columnName, Oid relationId)
continue; continue;
} }
/*
* We check if the referenced table is a reference table. There cannot be
* any foreign constraint from a distributed table to a local table.
*/
Assert(IsCitusTable(referencedTableId));
if (!IsCitusTableType(referencedTableId, REFERENCE_TABLE))
{
heapTuple = systable_getnext(scanDescriptor);
continue;
}
if (HeapTupleOfForeignConstraintIncludesColumn(heapTuple, relationId, if (HeapTupleOfForeignConstraintIncludesColumn(heapTuple, relationId,
pgConstraintKey, columnName)) pgConstraintKey, columnName))
{ {
foreignKeyToReferenceTableIncludesGivenColumn = true; Oid foreignKeyOid = get_relation_constraint_oid_compat(heapTuple);
break; foreignKeyIdsColumnAppeared = lappend_oid(foreignKeyIdsColumnAppeared,
foreignKeyOid);
} }
heapTuple = systable_getnext(scanDescriptor); heapTuple = systable_getnext(scanDescriptor);
@ -554,7 +582,7 @@ ColumnAppearsInForeignKeyToReferenceTable(char *columnName, Oid relationId)
systable_endscan(scanDescriptor); systable_endscan(scanDescriptor);
table_close(pgConstraint, NoLock); table_close(pgConstraint, NoLock);
return foreignKeyToReferenceTableIncludesGivenColumn; return foreignKeyIdsColumnAppeared;
} }

View File

@ -64,6 +64,26 @@ typedef enum ExtractForeignKeyConstraintsMode
EXCLUDE_SELF_REFERENCES = 1 << 2 EXCLUDE_SELF_REFERENCES = 1 << 2
} ExtractForeignKeyConstraintMode; } ExtractForeignKeyConstraintMode;
/*
* Flags that can be passed to GetForeignKeyIdsForColumn to
* indicate whether relationId argument should match:
* - referencing relation or,
* - referenced relation,
* or we are searching for both sides.
*/
typedef enum SearchForeignKeyColumnFlags
{
/* relationId argument should match referencing relation */
SEARCH_REFERENCING_RELATION = 1 << 0,
/* relationId argument should match referenced relation */
SEARCH_REFERENCED_RELATION = 1 << 1,
/* callers can also pass union of above flags */
} SearchForeignKeyColumnFlags;
/* cluster.c - forward declarations */ /* cluster.c - forward declarations */
extern List * PreprocessClusterStmt(Node *node, const char *clusterCommand); extern List * PreprocessClusterStmt(Node *node, const char *clusterCommand);
@ -119,6 +139,7 @@ extern void ErrorIfUnsupportedForeignConstraintExists(Relation relation,
Var *distributionColumn, Var *distributionColumn,
uint32 colocationId); uint32 colocationId);
extern void ErrorOutForFKeyBetweenPostgresAndCitusLocalTable(Oid localTableId); extern void ErrorOutForFKeyBetweenPostgresAndCitusLocalTable(Oid localTableId);
extern bool ColumnReferencedByAnyForeignKey(char *columnName, Oid relationId);
extern bool ColumnAppearsInForeignKeyToReferenceTable(char *columnName, Oid extern bool ColumnAppearsInForeignKeyToReferenceTable(char *columnName, Oid
relationId); relationId);
extern List * GetReferencingForeignConstaintCommands(Oid relationOid); extern List * GetReferencingForeignConstaintCommands(Oid relationOid);