Refactor the methods accessing to pg_constraint

Implement internal functions to accces to pg_contraint
and utilize them in existing foreign key checks.
pull/3846/head
Onur Tirtir 2020-05-20 16:14:09 +03:00
parent 80e34382cf
commit 79a688ffe0
5 changed files with 238 additions and 250 deletions

View File

@ -23,8 +23,10 @@
#include "catalog/pg_type.h" #include "catalog/pg_type.h"
#include "distributed/colocation_utils.h" #include "distributed/colocation_utils.h"
#include "distributed/commands.h" #include "distributed/commands.h"
#include "distributed/listutils.h"
#include "distributed/master_protocol.h" #include "distributed/master_protocol.h"
#include "distributed/multi_join_order.h" #include "distributed/multi_join_order.h"
#include "distributed/reference_table_utils.h"
#include "distributed/version_compat.h" #include "distributed/version_compat.h"
#include "utils/fmgroids.h" #include "utils/fmgroids.h"
#include "utils/lsyscache.h" #include "utils/lsyscache.h"
@ -33,68 +35,51 @@
#include "utils/ruleutils.h" #include "utils/ruleutils.h"
#include "utils/syscache.h" #include "utils/syscache.h"
/*
* Flags that can be passed to GetForeignKeyOids to indicate
* which foreign key constraint OIDs are to be extracted
*/
typedef enum ExtractForeignKeyConstrainstMode
{
/* extract the foreign key OIDs where the table is the referencing one */
INCLUDE_REFERENCING_CONSTRAINTS = 1 << 0,
/* extract the foreign key OIDs the table is the referenced one */
INCLUDE_REFERENCED_CONSTRAINTS = 1 << 1,
/* exclude the self-referencing foreign keys */
EXCLUDE_SELF_REFERENCES = 1 << 2
} ExtractForeignKeyConstraintMode;
/* Local functions forward declarations */ /* Local functions forward declarations */
static bool HeapTupleOfForeignConstraintIncludesColumn(HeapTuple heapTuple, Oid static bool HeapTupleOfForeignConstraintIncludesColumn(HeapTuple heapTuple,
relationId, int pgConstraintKey, Oid relationId,
int pgConstraintKey,
char *columnName); char *columnName);
static Oid FindForeignKeyOidWithName(List *foreignKeyOids, const
char *inputConstraintName);
static void ForeignConstraintFindDistKeys(HeapTuple pgConstraintTuple, static void ForeignConstraintFindDistKeys(HeapTuple pgConstraintTuple,
Var *referencingDistColumn, Var *referencingDistColumn,
Var *referencedDistColumn, Var *referencedDistColumn,
int *referencingAttrIndex, int *referencingAttrIndex,
int *referencedAttrIndex); int *referencedAttrIndex);
static List * GetForeignConstraintCommandsInternal(Oid relationId, int flags);
static Oid get_relation_constraint_oid_compat(HeapTuple heapTuple); static Oid get_relation_constraint_oid_compat(HeapTuple heapTuple);
static List * GetForeignKeyOidsToReferenceTables(Oid relationId);
static List * GetForeignKeyOids(Oid relationId, int flags);
/* /*
* ConstraintIsAForeignKeyToReferenceTable checks if the given constraint is a * ConstraintIsAForeignKeyToReferenceTable checks if the given constraint is a
* foreign key constraint from the given relation to a reference table. It does * foreign key constraint from the given relation to any reference table.
* that by scanning pg_constraint for foreign key constraints.
*/ */
bool bool
ConstraintIsAForeignKeyToReferenceTable(char *constraintName, Oid relationId) ConstraintIsAForeignKeyToReferenceTable(char *inputConstaintName, Oid relationId)
{ {
ScanKeyData scanKey[1]; List *foreignKeyOids = GetForeignKeyOidsToReferenceTables(relationId);
int scanKeyCount = 1;
bool foreignKeyToReferenceTable = false;
Oid foreignKeyOid = FindForeignKeyOidWithName(foreignKeyOids, inputConstaintName);
Relation pgConstraint = heap_open(ConstraintRelationId, AccessShareLock); return OidIsValid(foreignKeyOid);
ScanKeyInit(&scanKey[0], Anum_pg_constraint_contype, BTEqualStrategyNumber, F_CHAREQ,
CharGetDatum(CONSTRAINT_FOREIGN));
SysScanDesc scanDescriptor = systable_beginscan(pgConstraint, InvalidOid, false,
NULL, scanKeyCount, scanKey);
HeapTuple heapTuple = systable_getnext(scanDescriptor);
while (HeapTupleIsValid(heapTuple))
{
Form_pg_constraint constraintForm = (Form_pg_constraint) GETSTRUCT(heapTuple);
char *tupleConstraintName = (constraintForm->conname).data;
if (strncmp(constraintName, tupleConstraintName, NAMEDATALEN) != 0 ||
constraintForm->conrelid != relationId)
{
heapTuple = systable_getnext(scanDescriptor);
continue;
}
Oid referencedTableId = constraintForm->confrelid;
Assert(IsCitusTable(referencedTableId));
if (PartitionMethod(referencedTableId) == DISTRIBUTE_BY_NONE)
{
foreignKeyToReferenceTable = true;
break;
}
heapTuple = systable_getnext(scanDescriptor);
}
/* clean up scan and close system catalog */
systable_endscan(scanDescriptor);
heap_close(pgConstraint, AccessShareLock);
return foreignKeyToReferenceTable;
} }
@ -123,9 +108,6 @@ ErrorIfUnsupportedForeignConstraintExists(Relation relation, char referencingDis
Var *referencingDistKey, Var *referencingDistKey,
uint32 referencingColocationId) uint32 referencingColocationId)
{ {
ScanKeyData scanKey[1];
int scanKeyCount = 1;
Oid referencingTableId = relation->rd_id; Oid referencingTableId = relation->rd_id;
bool referencingNotReplicated = true; bool referencingNotReplicated = true;
bool referencingIsCitus = IsCitusTable(referencingTableId); bool referencingIsCitus = IsCitusTable(referencingTableId);
@ -141,17 +123,16 @@ ErrorIfUnsupportedForeignConstraintExists(Relation relation, char referencingDis
referencingNotReplicated = (ShardReplicationFactor == 1); referencingNotReplicated = (ShardReplicationFactor == 1);
} }
Relation pgConstraint = heap_open(ConstraintRelationId, AccessShareLock); int flags = INCLUDE_REFERENCING_CONSTRAINTS;
ScanKeyInit(&scanKey[0], Anum_pg_constraint_conrelid, BTEqualStrategyNumber, F_OIDEQ, List *foreignKeyOids = GetForeignKeyOids(referencingTableId, flags);
relation->rd_id);
SysScanDesc scanDescriptor = systable_beginscan(pgConstraint,
ConstraintRelidTypidNameIndexId,
true, NULL,
scanKeyCount, scanKey);
HeapTuple heapTuple = systable_getnext(scanDescriptor); Oid foreignKeyOid = InvalidOid;
while (HeapTupleIsValid(heapTuple)) foreach_oid(foreignKeyOid, foreignKeyOids)
{ {
HeapTuple heapTuple = SearchSysCache1(CONSTROID, ObjectIdGetDatum(foreignKeyOid));
Assert(HeapTupleIsValid(heapTuple));
Form_pg_constraint constraintForm = (Form_pg_constraint) GETSTRUCT(heapTuple); Form_pg_constraint constraintForm = (Form_pg_constraint) GETSTRUCT(heapTuple);
int referencingAttrIndex = -1; int referencingAttrIndex = -1;
@ -161,13 +142,6 @@ ErrorIfUnsupportedForeignConstraintExists(Relation relation, char referencingDis
int referencedAttrIndex = -1; int referencedAttrIndex = -1;
uint32 referencedColocationId = INVALID_COLOCATION_ID; uint32 referencedColocationId = INVALID_COLOCATION_ID;
/* not a foreign key constraint, skip to next one */
if (constraintForm->contype != CONSTRAINT_FOREIGN)
{
heapTuple = systable_getnext(scanDescriptor);
continue;
}
Oid referencedTableId = constraintForm->confrelid; Oid referencedTableId = constraintForm->confrelid;
bool referencedIsCitus = IsCitusTable(referencedTableId); bool referencedIsCitus = IsCitusTable(referencedTableId);
@ -207,7 +181,7 @@ ErrorIfUnsupportedForeignConstraintExists(Relation relation, char referencingDis
*/ */
if (referencingIsReferenceTable && referencedIsReferenceTable) if (referencingIsReferenceTable && referencedIsReferenceTable)
{ {
heapTuple = systable_getnext(scanDescriptor); ReleaseSysCache(heapTuple);
continue; continue;
} }
@ -330,18 +304,14 @@ ErrorIfUnsupportedForeignConstraintExists(Relation relation, char referencingDis
"https://citusdata.com/about/contact_us."))); "https://citusdata.com/about/contact_us.")));
} }
heapTuple = systable_getnext(scanDescriptor); ReleaseSysCache(heapTuple);
} }
/* clean up scan and close system catalog */
systable_endscan(scanDescriptor);
heap_close(pgConstraint, AccessShareLock);
} }
/* /*
* ForeignConstraintFindDistKeys finds the index of the given distribution columns * ForeignConstraintFindDistKeys finds the index of the given distribution columns
* in the given foreig key constraint and returns them in referencingAttrIndex * in the given foreign key constraint and returns them in referencingAttrIndex
* and referencedAttrIndex. If one of them is not found, it returns -1 instead. * and referencedAttrIndex. If one of them is not found, it returns -1 instead.
*/ */
static void static void
@ -400,9 +370,7 @@ ForeignConstraintFindDistKeys(HeapTuple pgConstraintTuple,
/* /*
* ColumnAppearsInForeignKeyToReferenceTable checks if there is a foreign key * ColumnAppearsInForeignKeyToReferenceTable checks if there is a foreign key
* constraint from/to a reference table on the given column. We iterate * constraint from/to any reference table on the given column.
* pg_constraint to fetch the constraint on the given relationId and find
* if any of the constraints includes the given column.
*/ */
bool bool
ColumnAppearsInForeignKeyToReferenceTable(char *columnName, Oid relationId) ColumnAppearsInForeignKeyToReferenceTable(char *columnName, Oid relationId)
@ -413,8 +381,8 @@ ColumnAppearsInForeignKeyToReferenceTable(char *columnName, Oid relationId)
Relation pgConstraint = heap_open(ConstraintRelationId, AccessShareLock); Relation pgConstraint = heap_open(ConstraintRelationId, AccessShareLock);
ScanKeyInit(&scanKey[0], Anum_pg_constraint_contype, BTEqualStrategyNumber, F_CHAREQ, ScanKeyInit(&scanKey[0], Anum_pg_constraint_contype, BTEqualStrategyNumber,
CharGetDatum(CONSTRAINT_FOREIGN)); F_CHAREQ, CharGetDatum(CONSTRAINT_FOREIGN));
SysScanDesc scanDescriptor = systable_beginscan(pgConstraint, InvalidOid, false, SysScanDesc scanDescriptor = systable_beginscan(pgConstraint, InvalidOid, false,
NULL, scanKeyCount, scanKey); NULL, scanKeyCount, scanKey);
@ -476,16 +444,30 @@ ColumnAppearsInForeignKeyToReferenceTable(char *columnName, Oid relationId)
/* /*
* GetTableForeignConstraints takes in a relationId, and returns the list of foreign * GetReferencingForeignConstaintCommands takes in a relationId, and
* constraint commands needed to reconstruct foreign constraints of that table. * returns the list of foreign constraint commands needed to reconstruct
* foreign key constraints that the table is involved in as the "referencing"
* one.
*/ */
List * List *
GetTableForeignConstraintCommands(Oid relationId) GetReferencingForeignConstaintCommands(Oid relationId)
{ {
List *tableForeignConstraints = NIL; int flags = INCLUDE_REFERENCING_CONSTRAINTS;
return GetForeignConstraintCommandsInternal(relationId, flags);
}
ScanKeyData scanKey[1];
int scanKeyCount = 1; /*
* GetForeignConstraintCommandsInternal is a wrapper function to get the
* DDL commands to recreate the foreign key constraints returned by
* GetForeignKeyOids. See more details at the underlying function.
*/
static List *
GetForeignConstraintCommandsInternal(Oid relationId, int flags)
{
List *foreignKeyOids = GetForeignKeyOids(relationId, flags);
List *foreignKeyCommands = NIL;
/* /*
* Set search_path to NIL so that all objects outside of pg_catalog will be * Set search_path to NIL so that all objects outside of pg_catalog will be
@ -497,41 +479,18 @@ GetTableForeignConstraintCommands(Oid relationId)
overridePath->addCatalog = true; overridePath->addCatalog = true;
PushOverrideSearchPath(overridePath); PushOverrideSearchPath(overridePath);
/* open system catalog and scan all constraints that belong to this table */ Oid foreignKeyOid = InvalidOid;
Relation pgConstraint = heap_open(ConstraintRelationId, AccessShareLock); foreach_oid(foreignKeyOid, foreignKeyOids)
ScanKeyInit(&scanKey[0], Anum_pg_constraint_conrelid, BTEqualStrategyNumber, F_OIDEQ,
relationId);
SysScanDesc scanDescriptor = systable_beginscan(pgConstraint,
ConstraintRelidTypidNameIndexId,
true, NULL,
scanKeyCount, scanKey);
HeapTuple heapTuple = systable_getnext(scanDescriptor);
while (HeapTupleIsValid(heapTuple))
{ {
Form_pg_constraint constraintForm = (Form_pg_constraint) GETSTRUCT(heapTuple); char *statementDef = pg_get_constraintdef_command(foreignKeyOid);
bool inheritedConstraint = OidIsValid(constraintForm->conparentid); foreignKeyCommands = lappend(foreignKeyCommands, statementDef);
if (!inheritedConstraint && constraintForm->contype == CONSTRAINT_FOREIGN)
{
Oid constraintId = get_relation_constraint_oid_compat(heapTuple);
char *statementDef = pg_get_constraintdef_command(constraintId);
tableForeignConstraints = lappend(tableForeignConstraints, statementDef);
} }
heapTuple = systable_getnext(scanDescriptor);
}
/* clean up scan and close system catalog */
systable_endscan(scanDescriptor);
heap_close(pgConstraint, AccessShareLock);
/* revert back to original search_path */ /* revert back to original search_path */
PopOverrideSearchPath(); PopOverrideSearchPath();
return tableForeignConstraints; return foreignKeyCommands;
} }
@ -561,106 +520,68 @@ get_relation_constraint_oid_compat(HeapTuple heapTuple)
/* /*
* HasForeignKeyToReferenceTable function scans the pgConstraint table to * HasForeignKeyToReferenceTable function returns true if any of the foreign
* fetch all of the constraints on the given relationId and see if at least one * key constraints on the relation with relationId references to a reference
* of them is a foreign key referencing to a reference table. * table.
*/ */
bool bool
HasForeignKeyToReferenceTable(Oid relationId) HasForeignKeyToReferenceTable(Oid relationId)
{ {
ScanKeyData scanKey[1]; List *foreignKeyOids = GetForeignKeyOidsToReferenceTables(relationId);
int scanKeyCount = 1;
bool hasForeignKeyToReferenceTable = false;
Relation pgConstraint = heap_open(ConstraintRelationId, AccessShareLock); return list_length(foreignKeyOids) > 0;
ScanKeyInit(&scanKey[0], Anum_pg_constraint_conrelid, BTEqualStrategyNumber, F_OIDEQ, }
relationId);
SysScanDesc scanDescriptor = systable_beginscan(pgConstraint,
ConstraintRelidTypidNameIndexId,
true, NULL,
scanKeyCount, scanKey);
HeapTuple heapTuple = systable_getnext(scanDescriptor);
while (HeapTupleIsValid(heapTuple)) /*
* GetForeignKeyOidsToReferenceTables function returns list of OIDs for the
* foreign key constraints on the given relationId that are referencing to
* reference tables.
*/
static List *
GetForeignKeyOidsToReferenceTables(Oid relationId)
{ {
int flags = INCLUDE_REFERENCING_CONSTRAINTS;
List *foreignKeyOids = GetForeignKeyOids(relationId, flags);
List *fkeyOidsToReferenceTables = NIL;
Oid foreignKeyOid = InvalidOid;
foreach_oid(foreignKeyOid, foreignKeyOids)
{
HeapTuple heapTuple =
SearchSysCache1(CONSTROID, ObjectIdGetDatum(foreignKeyOid));
Assert(HeapTupleIsValid(heapTuple));
Form_pg_constraint constraintForm = (Form_pg_constraint) GETSTRUCT(heapTuple); Form_pg_constraint constraintForm = (Form_pg_constraint) GETSTRUCT(heapTuple);
if (constraintForm->contype != CONSTRAINT_FOREIGN) Oid referencedTableOid = constraintForm->confrelid;
if (IsReferenceTable(referencedTableOid))
{ {
heapTuple = systable_getnext(scanDescriptor); fkeyOidsToReferenceTables = lappend_oid(fkeyOidsToReferenceTables,
continue; foreignKeyOid);
} }
Oid referencedTableId = constraintForm->confrelid; ReleaseSysCache(heapTuple);
if (!IsCitusTable(referencedTableId))
{
heapTuple = systable_getnext(scanDescriptor);
continue;
} }
if (PartitionMethod(referencedTableId) == DISTRIBUTE_BY_NONE) return fkeyOidsToReferenceTables;
{
hasForeignKeyToReferenceTable = true;
break;
}
heapTuple = systable_getnext(scanDescriptor);
}
/* clean up scan and close system catalog */
systable_endscan(scanDescriptor);
heap_close(pgConstraint, NoLock);
return hasForeignKeyToReferenceTable;
} }
/* /*
* TableReferenced function checks whether given table is referenced by another table * TableReferenced function checks whether given table is referenced by another table
* via foreign constraints. If it is referenced, this function returns true. To check * via foreign constraints. If it is referenced, this function returns true.
* that, this function searches for the given relation in the pg_constraint system
* catalog table. However since there are no indexes for the column we search for,
* this function performs sequential search. So call this function with caution.
*/ */
bool bool
TableReferenced(Oid relationId) TableReferenced(Oid relationId)
{ {
ScanKeyData scanKey[1]; int flags = INCLUDE_REFERENCED_CONSTRAINTS;
int scanKeyCount = 1; List *foreignKeyOids = GetForeignKeyOids(relationId, flags);
Oid scanIndexId = InvalidOid;
bool useIndex = false;
Relation pgConstraint = heap_open(ConstraintRelationId, AccessShareLock); return list_length(foreignKeyOids) > 0;
ScanKeyInit(&scanKey[0], Anum_pg_constraint_confrelid, BTEqualStrategyNumber, F_OIDEQ,
relationId);
SysScanDesc scanDescriptor = systable_beginscan(pgConstraint, scanIndexId, useIndex,
NULL,
scanKeyCount, scanKey);
HeapTuple heapTuple = systable_getnext(scanDescriptor);
while (HeapTupleIsValid(heapTuple))
{
Form_pg_constraint constraintForm = (Form_pg_constraint) GETSTRUCT(heapTuple);
if (constraintForm->contype == CONSTRAINT_FOREIGN)
{
systable_endscan(scanDescriptor);
heap_close(pgConstraint, NoLock);
return true;
}
heapTuple = systable_getnext(scanDescriptor);
}
/* clean up scan and close system catalog */
systable_endscan(scanDescriptor);
heap_close(pgConstraint, NoLock);
return false;
} }
@ -696,89 +617,153 @@ HeapTupleOfForeignConstraintIncludesColumn(HeapTuple heapTuple, Oid relationId,
/* /*
* TableReferencing function checks whether given table is referencing by another table * TableReferencing function checks whether given table is referencing to another
* via foreign constraints. If it is referencing, this function returns true. To check * table via foreign key constraints. If it is referencing, this function returns
* that, this function searches given relation at pg_constraints system catalog. However * true.
* since there is no index for the column we searched, this function performs sequential
* search, therefore call this function with caution.
*/ */
bool bool
TableReferencing(Oid relationId) TableReferencing(Oid relationId)
{ {
ScanKeyData scanKey[1]; int flags = INCLUDE_REFERENCING_CONSTRAINTS;
int scanKeyCount = 1; List *foreignKeyOids = GetForeignKeyOids(relationId, flags);
Oid scanIndexId = InvalidOid;
bool useIndex = false;
Relation pgConstraint = heap_open(ConstraintRelationId, AccessShareLock); return list_length(foreignKeyOids) > 0;
ScanKeyInit(&scanKey[0], Anum_pg_constraint_conrelid, BTEqualStrategyNumber, F_OIDEQ,
relationId);
SysScanDesc scanDescriptor = systable_beginscan(pgConstraint, scanIndexId, useIndex,
NULL,
scanKeyCount, scanKey);
HeapTuple heapTuple = systable_getnext(scanDescriptor);
while (HeapTupleIsValid(heapTuple))
{
Form_pg_constraint constraintForm = (Form_pg_constraint) GETSTRUCT(heapTuple);
if (constraintForm->contype == CONSTRAINT_FOREIGN)
{
systable_endscan(scanDescriptor);
heap_close(pgConstraint, NoLock);
return true;
}
heapTuple = systable_getnext(scanDescriptor);
}
systable_endscan(scanDescriptor);
heap_close(pgConstraint, NoLock);
return false;
} }
/* /*
* ConstraintIsAForeignKey returns true if the given constraint name * ConstraintIsAForeignKey returns true if the given constraint name
* is a foreign key to defined on the relation. * is a foreign key defined on the relation.
*/ */
bool bool
ConstraintIsAForeignKey(char *constraintNameInput, Oid relationId) ConstraintIsAForeignKey(char *inputConstaintName, Oid relationId)
{ {
int flags = INCLUDE_REFERENCING_CONSTRAINTS;
List *foreignKeyOids = GetForeignKeyOids(relationId, flags);
Oid foreignKeyOid = FindForeignKeyOidWithName(foreignKeyOids, inputConstaintName);
return OidIsValid(foreignKeyOid);
}
/*
* FindForeignKeyOidWithName searches the foreign key constraint with
* inputConstraintName in the given list of foreign key constraint OIDs.
* Returns the OID of the matching constraint. If there no matching constraint
* in the given list, then returns InvalidOid.
*/
static Oid
FindForeignKeyOidWithName(List *foreignKeyOids, const char *inputConstraintName)
{
Oid foreignKeyOid = InvalidOid;
foreach_oid(foreignKeyOid, foreignKeyOids)
{
char *constraintName = get_constraint_name(foreignKeyOid);
Assert(constraintName != NULL);
if (strncmp(constraintName, inputConstraintName, NAMEDATALEN) == 0)
{
return foreignKeyOid;
}
}
return InvalidOid;
}
/*
* GetForeignKeyOids takes in a relationId, and returns a list of OIDs for
* foreign constraints that the relation with relationId is involved according
* to "flags" argument. See ExtractForeignKeyConstrainstMode enum definition
* for usage of the flags.
*/
static List *
GetForeignKeyOids(Oid relationId, int flags)
{
AttrNumber pgConstraintTargetAttrNumber = InvalidAttrNumber;
bool extractReferencing PG_USED_FOR_ASSERTS_ONLY =
(flags & INCLUDE_REFERENCING_CONSTRAINTS);
bool extractReferenced PG_USED_FOR_ASSERTS_ONLY =
(flags & INCLUDE_REFERENCED_CONSTRAINTS);
/*
* Only one of them should be passed at a time since the way we scan
* pg_constraint differs for those columns. Anum_pg_constraint_conrelid
* supports index scan while Anum_pg_constraint_confrelid does not.
*/
Assert(!(extractReferencing && extractReferenced));
Assert(extractReferencing || extractReferenced);
bool useIndex = false;
Oid indexOid = InvalidOid;
if (flags & INCLUDE_REFERENCING_CONSTRAINTS)
{
pgConstraintTargetAttrNumber = Anum_pg_constraint_conrelid;
useIndex = true;
indexOid = ConstraintRelidTypidNameIndexId;
}
else if (flags & INCLUDE_REFERENCED_CONSTRAINTS)
{
pgConstraintTargetAttrNumber = Anum_pg_constraint_confrelid;
}
bool excludeSelfReference = (flags & EXCLUDE_SELF_REFERENCES);
List *foreignKeyOids = NIL;
ScanKeyData scanKey[1]; ScanKeyData scanKey[1];
int scanKeyCount = 1; int scanKeyCount = 1;
Relation pgConstraint = heap_open(ConstraintRelationId, AccessShareLock); Relation pgConstraint = heap_open(ConstraintRelationId, AccessShareLock);
ScanKeyInit(&scanKey[0], pgConstraintTargetAttrNumber,
ScanKeyInit(&scanKey[0], Anum_pg_constraint_contype, BTEqualStrategyNumber, F_CHAREQ, BTEqualStrategyNumber, F_OIDEQ, relationId);
CharGetDatum(CONSTRAINT_FOREIGN)); SysScanDesc scanDescriptor = systable_beginscan(pgConstraint, indexOid, useIndex,
SysScanDesc scanDescriptor = systable_beginscan(pgConstraint, InvalidOid, false,
NULL, scanKeyCount, scanKey); NULL, scanKeyCount, scanKey);
HeapTuple heapTuple = systable_getnext(scanDescriptor); HeapTuple heapTuple = systable_getnext(scanDescriptor);
while (HeapTupleIsValid(heapTuple)) while (HeapTupleIsValid(heapTuple))
{ {
Form_pg_constraint constraintForm = (Form_pg_constraint) GETSTRUCT(heapTuple); Form_pg_constraint constraintForm = (Form_pg_constraint) GETSTRUCT(heapTuple);
char *constraintName = (constraintForm->conname).data;
if (strncmp(constraintName, constraintNameInput, NAMEDATALEN) == 0 && if (constraintForm->contype != CONSTRAINT_FOREIGN)
constraintForm->conrelid == relationId)
{ {
systable_endscan(scanDescriptor); heapTuple = systable_getnext(scanDescriptor);
heap_close(pgConstraint, AccessShareLock); continue;
return true;
} }
bool inheritedConstraint = OidIsValid(constraintForm->conparentid);
if (inheritedConstraint)
{
/*
* We only consider the constraints that are explicitly created on
* the table as we already process the constraints from parent tables
* implicitly when a command is issued
*/
heapTuple = systable_getnext(scanDescriptor);
continue;
}
Oid constraintId = get_relation_constraint_oid_compat(heapTuple);
bool isSelfReference = (constraintForm->conrelid == constraintForm->confrelid);
if (excludeSelfReference && isSelfReference)
{
heapTuple = systable_getnext(scanDescriptor);
continue;
}
foreignKeyOids = lappend_oid(foreignKeyOids, constraintId);
heapTuple = systable_getnext(scanDescriptor); heapTuple = systable_getnext(scanDescriptor);
} }
/* clean up scan and close system catalog */
systable_endscan(scanDescriptor); systable_endscan(scanDescriptor);
heap_close(pgConstraint, AccessShareLock); heap_close(pgConstraint, AccessShareLock);
return false; return foreignKeyOids;
} }

View File

@ -489,7 +489,7 @@ EnsureTableListSuitableForReplication(List *tableIdList)
} }
List *foreignConstraintCommandList = List *foreignConstraintCommandList =
GetTableForeignConstraintCommands(tableId); GetReferencingForeignConstaintCommands(tableId);
if (foreignConstraintCommandList != NIL && if (foreignConstraintCommandList != NIL &&
PartitionMethod(tableId) != DISTRIBUTE_BY_NONE) PartitionMethod(tableId) != DISTRIBUTE_BY_NONE)
@ -812,7 +812,8 @@ CopyShardForeignConstraintCommandListGrouped(ShardInterval *shardInterval,
char *escapedSchemaName = quote_literal_cstr(schemaName); char *escapedSchemaName = quote_literal_cstr(schemaName);
int shardIndex = 0; int shardIndex = 0;
List *commandList = GetTableForeignConstraintCommands(shardInterval->relationId); List *commandList = GetReferencingForeignConstaintCommands(
shardInterval->relationId);
/* we will only use shardIndex if there is a foreign constraint */ /* we will only use shardIndex if there is a foreign constraint */
if (commandList != NIL) if (commandList != NIL)

View File

@ -378,7 +378,8 @@ CreateAppendDistributedShardPlacements(Oid relationId, int64 shardId,
int attemptCount = replicationFactor; int attemptCount = replicationFactor;
int workerNodeCount = list_length(workerNodeList); int workerNodeCount = list_length(workerNodeList);
int placementsCreated = 0; int placementsCreated = 0;
List *foreignConstraintCommandList = GetTableForeignConstraintCommands(relationId); List *foreignConstraintCommandList =
GetReferencingForeignConstaintCommands(relationId);
bool includeSequenceDefaults = false; bool includeSequenceDefaults = false;
List *ddlCommandList = GetTableDDLEvents(relationId, includeSequenceDefaults); List *ddlCommandList = GetTableDDLEvents(relationId, includeSequenceDefaults);
uint32 connectionFlag = FOR_DDL; uint32 connectionFlag = FOR_DDL;
@ -487,7 +488,7 @@ CreateShardsOnWorkers(Oid distributedRelationId, List *shardPlacements,
List *ddlCommandList = GetTableDDLEvents(distributedRelationId, List *ddlCommandList = GetTableDDLEvents(distributedRelationId,
includeSequenceDefaults); includeSequenceDefaults);
List *foreignConstraintCommandList = List *foreignConstraintCommandList =
GetTableForeignConstraintCommands(distributedRelationId); GetReferencingForeignConstaintCommands(distributedRelationId);
int taskId = 1; int taskId = 1;
List *taskList = NIL; List *taskList = NIL;

View File

@ -410,7 +410,7 @@ MetadataCreateCommands(void)
foreach_ptr(cacheEntry, propagatedTableList) foreach_ptr(cacheEntry, propagatedTableList)
{ {
List *foreignConstraintCommands = List *foreignConstraintCommands =
GetTableForeignConstraintCommands(cacheEntry->relationId); GetReferencingForeignConstaintCommands(cacheEntry->relationId);
metadataSnapshotCommandList = list_concat(metadataSnapshotCommandList, metadataSnapshotCommandList = list_concat(metadataSnapshotCommandList,
foreignConstraintCommands); foreignConstraintCommands);
@ -497,7 +497,8 @@ GetDistributedTableDDLEvents(Oid relationId)
commandList = list_concat(commandList, shardMetadataInsertCommandList); commandList = list_concat(commandList, shardMetadataInsertCommandList);
/* commands to create foreign key constraints */ /* commands to create foreign key constraints */
List *foreignConstraintCommands = GetTableForeignConstraintCommands(relationId); List *foreignConstraintCommands =
GetReferencingForeignConstaintCommands(relationId);
commandList = list_concat(commandList, foreignConstraintCommands); commandList = list_concat(commandList, foreignConstraintCommands);
/* commands to create partitioning hierarchy */ /* commands to create partitioning hierarchy */

View File

@ -101,11 +101,11 @@ extern void ErrorIfUnsupportedForeignConstraintExists(Relation relation,
uint32 colocationId); uint32 colocationId);
extern bool ColumnAppearsInForeignKeyToReferenceTable(char *columnName, Oid extern bool ColumnAppearsInForeignKeyToReferenceTable(char *columnName, Oid
relationId); relationId);
extern List * GetTableForeignConstraintCommands(Oid relationId); extern List * GetReferencingForeignConstaintCommands(Oid relationOid);
extern bool HasForeignKeyToReferenceTable(Oid relationId); extern bool HasForeignKeyToReferenceTable(Oid relationOid);
extern bool TableReferenced(Oid relationId); extern bool TableReferenced(Oid relationOid);
extern bool TableReferencing(Oid relationId); extern bool TableReferencing(Oid relationOid);
extern bool ConstraintIsAForeignKey(char *constraintName, Oid relationId); extern bool ConstraintIsAForeignKey(char *inputConstaintName, Oid relationOid);
/* function.c - forward declarations */ /* function.c - forward declarations */