change ErrorIfUnsupportedForeignConstraintExists function to use in two-way

improve-drop-trigger2
Onur Tirtir 2020-03-04 11:13:10 +03:00
parent ef1447e360
commit 8cec216af1
1 changed files with 238 additions and 170 deletions

View File

@ -127,34 +127,35 @@ ConstraintIsAForeignKeyToReferenceTable(char *constraintName, Oid relationId)
* usage. * usage.
*/ */
void void
ErrorIfUnsupportedForeignConstraintExists(Relation relation, char referencingDistMethod, ErrorIfUnsupportedForeignConstraintExists(Relation relation, char distributionMethod,
Var *referencingDistKey, Var *distributionColumn,
uint32 referencingColocationId) uint32 colocationId)
{ {
ScanKeyData scanKey[1]; ScanKeyData scanKey[1];
int scanKeyCount = 1; int scanKeyCount = 1;
Oid relationOid = relation->rd_id; Oid relationOid = relation->rd_id;
bool relationNotReplicated = true;
bool relationIsCitusTable = IsCitusTable(relationOid); bool relationIsCitusTable = IsCitusTable(relationOid);
bool relationIsReferenceTable = (distributionMethod == DISTRIBUTE_BY_NONE);
bool relationNotReplicated = true;
if (relationIsCitusTable) if (relationIsCitusTable)
{ {
/* ALTER TABLE command is applied over single replicated table */ /* ALTER TABLE command is applied over single replicated table */
referencingNotReplicated = SingleReplicatedTable(referencingTableId); relationNotReplicated = SingleReplicatedTable(relationOid);
} }
else else
{ {
/* Creating single replicated table with foreign constraint */ /* Creating single replicated table with foreign constraint */
referencingNotReplicated = (ShardReplicationFactor == 1); relationNotReplicated = (ShardReplicationFactor == 1);
} }
Relation pgConstraint = heap_open(ConstraintRelationId, AccessShareLock); Relation pgConstraint = heap_open(ConstraintRelationId, AccessShareLock);
ScanKeyInit(&scanKey[0], Anum_pg_constraint_conrelid, BTEqualStrategyNumber, F_OIDEQ, ScanKeyInit(&scanKey[0], Anum_pg_constraint_contype, BTEqualStrategyNumber, F_CHAREQ,
relation->rd_id); CharGetDatum(CONSTRAINT_FOREIGN));
SysScanDesc scanDescriptor = systable_beginscan(pgConstraint, SysScanDesc scanDescriptor = systable_beginscan(pgConstraint,
ConstraintRelidTypidNameIndexId, InvalidOid,
true, NULL, false, NULL,
scanKeyCount, scanKey); scanKeyCount, scanKey);
HeapTuple heapTuple = systable_getnext(scanDescriptor); HeapTuple heapTuple = systable_getnext(scanDescriptor);
@ -163,22 +164,25 @@ ErrorIfUnsupportedForeignConstraintExists(Relation relation, char referencingDis
{ {
Form_pg_constraint constraintForm = (Form_pg_constraint) GETSTRUCT(heapTuple); Form_pg_constraint constraintForm = (Form_pg_constraint) GETSTRUCT(heapTuple);
int referencingAttrIndex = -1; if (constraintForm->conrelid == relationOid)
{
/* alias variables from the referencing table perspective */
Oid referencingTableId = relationOid;
char referencingDistMethod = distributionMethod;
Var *referencingDistKey = distributionColumn;
uint32 referencingColocationId = colocationId;
bool referencingIsCitusTable = relationIsCitusTable;
bool referencingIsReferenceTable = relationIsReferenceTable;
bool referencingNotReplicated = relationNotReplicated;
char referencedDistMethod = 0; char referencedDistMethod = 0;
Var *referencedDistKey = NULL; Var *referencedDistKey = NULL;
int referencedAttrIndex = -1;
uint32 referencedColocationId = INVALID_COLOCATION_ID; uint32 referencedColocationId = INVALID_COLOCATION_ID;
/* not a foreign key constraint, skip to next one */
if (constraintForm->contype != CONSTRAINT_FOREIGN)
{
heapTuple = systable_getnext(scanDescriptor);
continue;
}
Oid referencedTableId = constraintForm->confrelid; Oid referencedTableId = constraintForm->confrelid;
bool referencedIsCitusTable = IsCitusTable(referencedTableId); bool referencedIsCitusTable = IsCitusTable(referencedTableId);
int referencedAttrIndex = -1;
bool selfReferencingTable = (referencingTableId == referencedTableId); bool selfReferencingTable = (referencingTableId == referencedTableId);
@ -192,8 +196,9 @@ ErrorIfUnsupportedForeignConstraintExists(Relation relation, char referencingDis
} }
/* set referenced table related variables here if table is referencing itself */ /* set referenced table related variables here if table is referencing itself */
if (!selfReferencingTable) if (!selfReferencingTable)
{
if (referencedIsCitusTable)
{ {
referencedDistMethod = PartitionMethod(referencedTableId); referencedDistMethod = PartitionMethod(referencedTableId);
referencedDistKey = (referencedDistMethod == DISTRIBUTE_BY_NONE) ? referencedDistKey = (referencedDistMethod == DISTRIBUTE_BY_NONE) ?
@ -201,6 +206,7 @@ ErrorIfUnsupportedForeignConstraintExists(Relation relation, char referencingDis
DistPartitionKey(referencedTableId); DistPartitionKey(referencedTableId);
referencedColocationId = TableColocationId(referencedTableId); referencedColocationId = TableColocationId(referencedTableId);
} }
}
else else
{ {
referencedDistMethod = referencingDistMethod; referencedDistMethod = referencingDistMethod;
@ -208,25 +214,56 @@ ErrorIfUnsupportedForeignConstraintExists(Relation relation, char referencingDis
referencedColocationId = referencingColocationId; referencedColocationId = referencingColocationId;
} }
bool referencingIsReferenceTable = (referencingDistMethod == DISTRIBUTE_BY_NONE); bool referencedIsReferenceTable = (referencedDistMethod ==
bool referencedIsReferenceTable = (referencedDistMethod == DISTRIBUTE_BY_NONE); DISTRIBUTE_BY_NONE);
/* foreign key constraint between reference tables */
if (referencingIsReferenceTable && referencedIsReferenceTable)
{
/* /*
* We support foreign keys between reference tables. No more checks * We support foreign keys between reference tables. No more checks
* are necessary. * are necessary.
*/ */
if (referencingIsReferenceTable && referencedIsReferenceTable)
{
heapTuple = systable_getnext(scanDescriptor); heapTuple = systable_getnext(scanDescriptor);
continue; continue;
} }
/* foreign key constraint from a reference table to a local table */
if (referencingIsReferenceTable && !referencedIsCitusTable)
{
/*
* Upgrading "a local table having a foreign key constraint to another
* local table" to a reference table is not supported.
*/
ereport(ERROR, (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
errmsg("cannot create foreign key constraint"),
errdetail(
"Local table having a foreign key constraint to another "
"local table cannot be upgraded to a reference table"),
errhint(
"To define foreign key constraint from a reference table to a "
"local table, use ALTER TABLE ADD CONSTRAINT ... command after "
"creating the reference table without a foreign key")));
}
/* distributed table to local table */
if (!referencingIsReferenceTable && referencingIsCitusTable &&
!referencedIsCitusTable)
{
ereport(ERROR, (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
errmsg("cannot create foreign key constraint"),
errdetail(
"Foreign keys from distributed tables to local tables are not supported")));
}
if (referencingIsReferenceTable && referencedIsCitusTable &&
!referencedIsReferenceTable)
{
/* /*
* Foreign keys from reference tables to distributed tables are not * Foreign keys from reference tables to distributed tables are not
* supported. * supported.
*/ */
if (referencingIsReferenceTable && !referencedIsReferenceTable)
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot create foreign key constraint " errmsg("cannot create foreign key constraint "
"since foreign keys from reference tables " "since foreign keys from reference tables "
@ -254,6 +291,8 @@ ErrorIfUnsupportedForeignConstraintExists(Relation relation, char referencingDis
"distributed table or a reference table"))); "distributed table or a reference table")));
} }
int referencingAttrIndex = -1;
ForeignConstraintFindDistKeys(heapTuple, ForeignConstraintFindDistKeys(heapTuple,
referencingDistKey, referencingDistKey,
referencedDistKey, referencedDistKey,
@ -335,11 +374,40 @@ ErrorIfUnsupportedForeignConstraintExists(Relation relation, char referencingDis
errdetail("Citus Community Edition currently supports " errdetail("Citus Community Edition currently supports "
"foreign key constraints only for " "foreign key constraints only for "
"\"citus.shard_replication_factor = 1\"."), "\"citus.shard_replication_factor = 1\"."),
errhint("Please change \"citus.shard_replication_factor to " errhint(
"Please change \"citus.shard_replication_factor to "
"1\". To learn more about using foreign keys with " "1\". To learn more about using foreign keys with "
"other replication factors, please contact us at " "other replication factors, please contact us at "
"https://citusdata.com/about/contact_us."))); "https://citusdata.com/about/contact_us.")));
} }
}
else if (constraintForm->confrelid == relationOid)
{
/* alias variables from the referenced table perspective */
bool referencedIsReferenceTable = relationIsReferenceTable;
Oid referencingTableId = constraintForm->conrelid;
bool referencingIsCitusTable = IsCitusTable(referencingTableId);
/* foreign key constraint from a local table to a reference table */
if (!referencingIsCitusTable && referencedIsReferenceTable)
{
/*
* Upgrading "a local table referenced by another local table"
* to a reference table is supported, but not encouraged.
*/
ereport(WARNING, (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
errmsg(
"should not create foreign key constraint in this way"),
errdetail(
"Local table referenced by another local table should not "
"be upgraded to a reference table"),
errhint(
"To define foreign key constraint from a local table to a "
"reference table properly, use ALTER TABLE ADD CONSTRAINT ... "
"command after creating the reference table without a foreign key")));
}
}
heapTuple = systable_getnext(scanDescriptor); heapTuple = systable_getnext(scanDescriptor);
} }
@ -430,7 +498,7 @@ ErrorIfUnsupportedAlterAddDropFKeyBetweenReferecenceAndLocalTable(Oid referencin
char referencingDistMethod = 0; char referencingDistMethod = 0;
bool referencingIsReferenceTable = false; bool referencingIsReferenceTable = false;
bool referencedIsDistributed = IsDistributedTable(referencedTableOid); bool referencedIsCitusTable = IsCitusTable(referencedTableOid);
char referencedDistMethod = 0; char referencedDistMethod = 0;
bool referencedIsReferenceTable = false; bool referencedIsReferenceTable = false;
@ -440,7 +508,7 @@ ErrorIfUnsupportedAlterAddDropFKeyBetweenReferecenceAndLocalTable(Oid referencin
referencingIsReferenceTable = (referencingDistMethod == DISTRIBUTE_BY_NONE); referencingIsReferenceTable = (referencingDistMethod == DISTRIBUTE_BY_NONE);
} }
if (referencedIsDistributed) if (referencedIsCitusTable)
{ {
referencedDistMethod = PartitionMethod(referencedTableOid); referencedDistMethod = PartitionMethod(referencedTableOid);
referencedIsReferenceTable = (referencedDistMethod == DISTRIBUTE_BY_NONE); referencedIsReferenceTable = (referencedDistMethod == DISTRIBUTE_BY_NONE);