mirror of https://github.com/citusdata/citus.git
Merge pull request #2205 from citusdata/foreign_refactoring
Foreign key code refactoring into ddl/foreign_constraint.cpull/2201/merge
commit
a0651df574
1
Makefile
1
Makefile
|
@ -12,6 +12,7 @@ OBJS = src/backend/distributed/shared_library_init.o \
|
|||
src/backend/distributed/connection/connection_management.o \
|
||||
src/backend/distributed/connection/placement_connection.o \
|
||||
src/backend/distributed/connection/remote_commands.o \
|
||||
src/backend/distributed/ddl/foreign_constraint.o \
|
||||
src/backend/distributed/executor/citus_custom_scan.o \
|
||||
src/backend/distributed/executor/insert_select_executor.o \
|
||||
src/backend/distributed/executor/intermediate_results.o \
|
||||
|
|
|
@ -24,7 +24,7 @@ DATA = $(patsubst $(citus_abs_srcdir)/%.sql,%.sql,$(wildcard $(citus_abs_srcdir)
|
|||
DATA_built = $(foreach v,$(EXTVERSIONS),$(EXTENSION)--$(v).sql)
|
||||
|
||||
# directories with source files
|
||||
SUBDIRS = . commands connection executor master metadata planner progress relay test transaction utils worker
|
||||
SUBDIRS = . commands connection ddl executor master metadata planner progress relay test transaction utils worker
|
||||
|
||||
# That patsubst rule searches all directories listed in SUBDIRS for .c
|
||||
# files, and adds the corresponding .o files to OBJS
|
||||
|
|
|
@ -34,6 +34,7 @@
|
|||
#include "distributed/citus_ruleutils.h"
|
||||
#include "distributed/colocation_utils.h"
|
||||
#include "distributed/distribution_column.h"
|
||||
#include "distributed/foreign_constraint.h"
|
||||
#include "distributed/master_metadata_utility.h"
|
||||
#include "distributed/master_protocol.h"
|
||||
#include "distributed/metadata_cache.h"
|
||||
|
|
|
@ -0,0 +1,369 @@
|
|||
/*-------------------------------------------------------------------------
|
||||
*
|
||||
* foreign_constraint.c
|
||||
*
|
||||
* This file contains functions to create, alter and drop foreign
|
||||
* constraints on distributed tables.
|
||||
*
|
||||
* Copyright (c) 2018, Citus Data, Inc.
|
||||
*
|
||||
*-------------------------------------------------------------------------
|
||||
*/
|
||||
|
||||
#include "postgres.h"
|
||||
|
||||
#include "access/htup_details.h"
|
||||
#include "catalog/namespace.h"
|
||||
#include "catalog/pg_constraint.h"
|
||||
#include "catalog/pg_constraint_fn.h"
|
||||
#include "catalog/pg_type.h"
|
||||
#include "distributed/colocation_utils.h"
|
||||
#include "distributed/foreign_constraint.h"
|
||||
#include "distributed/master_protocol.h"
|
||||
#include "distributed/multi_join_order.h"
|
||||
#include "utils/fmgroids.h"
|
||||
#include "utils/rel.h"
|
||||
#include "utils/relcache.h"
|
||||
#include "utils/ruleutils.h"
|
||||
#include "utils/syscache.h"
|
||||
|
||||
|
||||
/*
|
||||
* ErrorIfUnsupportedForeignConstraint runs checks related to foreign constraints and
|
||||
* errors out if it is not possible to create one of the foreign constraint in distributed
|
||||
* environment.
|
||||
*
|
||||
* To support foreign constraints, we require that;
|
||||
* - Referencing and referenced tables are hash distributed.
|
||||
* - Referencing and referenced tables are co-located.
|
||||
* - Foreign constraint is defined over distribution column.
|
||||
* - ON DELETE/UPDATE SET NULL, ON DELETE/UPDATE SET DEFAULT and ON UPDATE CASCADE options
|
||||
* are not used.
|
||||
* - Replication factors of referencing and referenced table are 1.
|
||||
*/
|
||||
void
|
||||
ErrorIfUnsupportedForeignConstraint(Relation relation, char distributionMethod,
|
||||
Var *distributionColumn, uint32 colocationId)
|
||||
{
|
||||
Relation pgConstraint = NULL;
|
||||
SysScanDesc scanDescriptor = NULL;
|
||||
ScanKeyData scanKey[1];
|
||||
int scanKeyCount = 1;
|
||||
HeapTuple heapTuple = NULL;
|
||||
|
||||
Oid referencingTableId = relation->rd_id;
|
||||
Oid referencedTableId = InvalidOid;
|
||||
uint32 referencedTableColocationId = INVALID_COLOCATION_ID;
|
||||
Var *referencedTablePartitionColumn = NULL;
|
||||
|
||||
Datum referencingColumnsDatum;
|
||||
Datum *referencingColumnArray;
|
||||
int referencingColumnCount = 0;
|
||||
Datum referencedColumnsDatum;
|
||||
Datum *referencedColumnArray;
|
||||
int referencedColumnCount = 0;
|
||||
bool isNull = false;
|
||||
int attrIdx = 0;
|
||||
bool foreignConstraintOnPartitionColumn = false;
|
||||
bool selfReferencingTable = false;
|
||||
|
||||
pgConstraint = heap_open(ConstraintRelationId, AccessShareLock);
|
||||
ScanKeyInit(&scanKey[0], Anum_pg_constraint_conrelid, BTEqualStrategyNumber, F_OIDEQ,
|
||||
relation->rd_id);
|
||||
scanDescriptor = systable_beginscan(pgConstraint, ConstraintRelidIndexId, true, NULL,
|
||||
scanKeyCount, scanKey);
|
||||
|
||||
heapTuple = systable_getnext(scanDescriptor);
|
||||
while (HeapTupleIsValid(heapTuple))
|
||||
{
|
||||
Form_pg_constraint constraintForm = (Form_pg_constraint) GETSTRUCT(heapTuple);
|
||||
bool singleReplicatedTable = true;
|
||||
|
||||
if (constraintForm->contype != CONSTRAINT_FOREIGN)
|
||||
{
|
||||
heapTuple = systable_getnext(scanDescriptor);
|
||||
continue;
|
||||
}
|
||||
|
||||
referencedTableId = constraintForm->confrelid;
|
||||
selfReferencingTable = referencingTableId == referencedTableId;
|
||||
|
||||
/*
|
||||
* We do not support foreign keys for reference tables. Here we skip the second
|
||||
* part of check if the table is a self referencing table because;
|
||||
* - PartitionMethod only works for distributed tables and this table may not be
|
||||
* distributed yet.
|
||||
* - Since referencing and referenced tables are same, it is OK to not checking
|
||||
* distribution method twice.
|
||||
*/
|
||||
if (distributionMethod == DISTRIBUTE_BY_NONE ||
|
||||
(!selfReferencingTable &&
|
||||
PartitionMethod(referencedTableId) == DISTRIBUTE_BY_NONE))
|
||||
{
|
||||
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||
errmsg("cannot create foreign key constraint from or to "
|
||||
"reference tables")));
|
||||
}
|
||||
|
||||
/*
|
||||
* ON DELETE SET NULL and ON DELETE SET DEFAULT is not supported. Because we do
|
||||
* not want to set partition column to NULL or default value.
|
||||
*/
|
||||
if (constraintForm->confdeltype == FKCONSTR_ACTION_SETNULL ||
|
||||
constraintForm->confdeltype == FKCONSTR_ACTION_SETDEFAULT)
|
||||
{
|
||||
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||
errmsg("cannot create foreign key constraint"),
|
||||
errdetail("SET NULL or SET DEFAULT is not supported"
|
||||
" in ON DELETE operation.")));
|
||||
}
|
||||
|
||||
/*
|
||||
* ON UPDATE SET NULL, ON UPDATE SET DEFAULT and UPDATE CASCADE is not supported.
|
||||
* Because we do not want to set partition column to NULL or default value. Also
|
||||
* cascading update operation would require re-partitioning. Updating partition
|
||||
* column value is not allowed anyway even outside of foreign key concept.
|
||||
*/
|
||||
if (constraintForm->confupdtype == FKCONSTR_ACTION_SETNULL ||
|
||||
constraintForm->confupdtype == FKCONSTR_ACTION_SETDEFAULT ||
|
||||
constraintForm->confupdtype == FKCONSTR_ACTION_CASCADE)
|
||||
{
|
||||
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||
errmsg("cannot create foreign key constraint"),
|
||||
errdetail("SET NULL, SET DEFAULT or CASCADE is not"
|
||||
" supported in ON UPDATE operation.")));
|
||||
}
|
||||
|
||||
/*
|
||||
* Some checks are not meaningful if foreign key references the table itself.
|
||||
* Therefore we will skip those checks.
|
||||
*/
|
||||
if (!selfReferencingTable)
|
||||
{
|
||||
if (!IsDistributedTable(referencedTableId))
|
||||
{
|
||||
ereport(ERROR, (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
|
||||
errmsg("cannot create foreign key constraint"),
|
||||
errdetail("Referenced table must be a distributed "
|
||||
"table.")));
|
||||
}
|
||||
|
||||
/* to enforce foreign constraints, tables must be co-located */
|
||||
referencedTableColocationId = TableColocationId(referencedTableId);
|
||||
if (colocationId == INVALID_COLOCATION_ID ||
|
||||
colocationId != referencedTableColocationId)
|
||||
{
|
||||
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||
errmsg("cannot create foreign key constraint"),
|
||||
errdetail("Foreign key constraint can only be created"
|
||||
" on co-located tables.")));
|
||||
}
|
||||
|
||||
/*
|
||||
* Partition column must exist in both referencing and referenced side of the
|
||||
* foreign key constraint. They also must be in same ordinal.
|
||||
*/
|
||||
referencedTablePartitionColumn = DistPartitionKey(referencedTableId);
|
||||
}
|
||||
else
|
||||
{
|
||||
/*
|
||||
* Partition column must exist in both referencing and referenced side of the
|
||||
* foreign key constraint. They also must be in same ordinal.
|
||||
*/
|
||||
referencedTablePartitionColumn = distributionColumn;
|
||||
}
|
||||
|
||||
/*
|
||||
* Column attributes are not available in Form_pg_constraint, therefore we need
|
||||
* to find them in the system catalog. After finding them, we iterate over column
|
||||
* attributes together because partition column must be at the same place in both
|
||||
* referencing and referenced side of the foreign key constraint
|
||||
*/
|
||||
referencingColumnsDatum = SysCacheGetAttr(CONSTROID, heapTuple,
|
||||
Anum_pg_constraint_conkey, &isNull);
|
||||
referencedColumnsDatum = SysCacheGetAttr(CONSTROID, heapTuple,
|
||||
Anum_pg_constraint_confkey, &isNull);
|
||||
|
||||
deconstruct_array(DatumGetArrayTypeP(referencingColumnsDatum), INT2OID, 2, true,
|
||||
's', &referencingColumnArray, NULL, &referencingColumnCount);
|
||||
deconstruct_array(DatumGetArrayTypeP(referencedColumnsDatum), INT2OID, 2, true,
|
||||
's', &referencedColumnArray, NULL, &referencedColumnCount);
|
||||
|
||||
Assert(referencingColumnCount == referencedColumnCount);
|
||||
|
||||
for (attrIdx = 0; attrIdx < referencingColumnCount; ++attrIdx)
|
||||
{
|
||||
AttrNumber referencingAttrNo = DatumGetInt16(referencingColumnArray[attrIdx]);
|
||||
AttrNumber referencedAttrNo = DatumGetInt16(referencedColumnArray[attrIdx]);
|
||||
|
||||
if (distributionColumn->varattno == referencingAttrNo &&
|
||||
referencedTablePartitionColumn->varattno == referencedAttrNo)
|
||||
{
|
||||
foreignConstraintOnPartitionColumn = true;
|
||||
}
|
||||
}
|
||||
|
||||
if (!foreignConstraintOnPartitionColumn)
|
||||
{
|
||||
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||
errmsg("cannot create foreign key constraint"),
|
||||
errdetail("Partition column must exist both "
|
||||
"referencing and referenced side of the "
|
||||
"foreign constraint statement and it must "
|
||||
"be in the same ordinal in both sides.")));
|
||||
}
|
||||
|
||||
/*
|
||||
* We do not allow to create foreign constraints if shard replication factor is
|
||||
* greater than 1. Because in our current design, multiple replicas may cause
|
||||
* locking problems and inconsistent shard contents. We don't check the referenced
|
||||
* table, since referenced and referencing tables should be co-located and
|
||||
* colocation check has been done above.
|
||||
*/
|
||||
if (IsDistributedTable(referencingTableId))
|
||||
{
|
||||
/* check whether ALTER TABLE command is applied over single replicated table */
|
||||
if (!SingleReplicatedTable(referencingTableId))
|
||||
{
|
||||
singleReplicatedTable = false;
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
/* check whether creating single replicated table with foreign constraint */
|
||||
if (ShardReplicationFactor > 1)
|
||||
{
|
||||
singleReplicatedTable = false;
|
||||
}
|
||||
}
|
||||
|
||||
if (!singleReplicatedTable)
|
||||
{
|
||||
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||
errmsg("cannot create foreign key constraint"),
|
||||
errdetail("Citus Community Edition currently supports "
|
||||
"foreign key constraints only for "
|
||||
"\"citus.shard_replication_factor = 1\"."),
|
||||
errhint("Please change \"citus.shard_replication_factor to "
|
||||
"1\". To learn more about using foreign keys with "
|
||||
"other replication factors, please contact us at "
|
||||
"https://citusdata.com/about/contact_us.")));
|
||||
}
|
||||
|
||||
heapTuple = systable_getnext(scanDescriptor);
|
||||
}
|
||||
|
||||
/* clean up scan and close system catalog */
|
||||
systable_endscan(scanDescriptor);
|
||||
heap_close(pgConstraint, AccessShareLock);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* GetTableForeignConstraints takes in a relationId, and returns the list of foreign
|
||||
* constraint commands needed to reconstruct foreign constraints of that table.
|
||||
*/
|
||||
List *
|
||||
GetTableForeignConstraintCommands(Oid relationId)
|
||||
{
|
||||
List *tableForeignConstraints = NIL;
|
||||
|
||||
Relation pgConstraint = NULL;
|
||||
SysScanDesc scanDescriptor = NULL;
|
||||
ScanKeyData scanKey[1];
|
||||
int scanKeyCount = 1;
|
||||
HeapTuple heapTuple = NULL;
|
||||
|
||||
/*
|
||||
* Set search_path to NIL so that all objects outside of pg_catalog will be
|
||||
* schema-prefixed. pg_catalog will be added automatically when we call
|
||||
* PushOverrideSearchPath(), since we set addCatalog to true;
|
||||
*/
|
||||
OverrideSearchPath *overridePath = GetOverrideSearchPath(CurrentMemoryContext);
|
||||
overridePath->schemas = NIL;
|
||||
overridePath->addCatalog = true;
|
||||
PushOverrideSearchPath(overridePath);
|
||||
|
||||
/* open system catalog and scan all constraints that belong to this table */
|
||||
pgConstraint = heap_open(ConstraintRelationId, AccessShareLock);
|
||||
ScanKeyInit(&scanKey[0], Anum_pg_constraint_conrelid, BTEqualStrategyNumber, F_OIDEQ,
|
||||
relationId);
|
||||
scanDescriptor = systable_beginscan(pgConstraint, ConstraintRelidIndexId, true, NULL,
|
||||
scanKeyCount, scanKey);
|
||||
|
||||
heapTuple = systable_getnext(scanDescriptor);
|
||||
while (HeapTupleIsValid(heapTuple))
|
||||
{
|
||||
Form_pg_constraint constraintForm = (Form_pg_constraint) GETSTRUCT(heapTuple);
|
||||
|
||||
if (constraintForm->contype == CONSTRAINT_FOREIGN)
|
||||
{
|
||||
Oid constraintId = get_relation_constraint_oid(relationId,
|
||||
constraintForm->conname.data,
|
||||
true);
|
||||
char *statementDef = pg_get_constraintdef_command(constraintId);
|
||||
|
||||
tableForeignConstraints = lappend(tableForeignConstraints, statementDef);
|
||||
}
|
||||
|
||||
heapTuple = systable_getnext(scanDescriptor);
|
||||
}
|
||||
|
||||
/* clean up scan and close system catalog */
|
||||
systable_endscan(scanDescriptor);
|
||||
heap_close(pgConstraint, AccessShareLock);
|
||||
|
||||
/* revert back to original search_path */
|
||||
PopOverrideSearchPath();
|
||||
|
||||
return tableForeignConstraints;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* TableReferenced function checks whether given table is referenced by another table
|
||||
* via foreign constraints. If it is referenced, this function returns true. To check
|
||||
* that, this function searches given relation at pg_constraints system catalog. However
|
||||
* since there is no index for the column we searched, this function performs sequential
|
||||
* search, therefore call this function with caution.
|
||||
*/
|
||||
bool
|
||||
TableReferenced(Oid relationId)
|
||||
{
|
||||
Relation pgConstraint = NULL;
|
||||
HeapTuple heapTuple = NULL;
|
||||
SysScanDesc scanDescriptor = NULL;
|
||||
ScanKeyData scanKey[1];
|
||||
int scanKeyCount = 1;
|
||||
Oid scanIndexId = InvalidOid;
|
||||
bool useIndex = false;
|
||||
|
||||
pgConstraint = heap_open(ConstraintRelationId, AccessShareLock);
|
||||
|
||||
ScanKeyInit(&scanKey[0], Anum_pg_constraint_confrelid, BTEqualStrategyNumber, F_OIDEQ,
|
||||
relationId);
|
||||
scanDescriptor = systable_beginscan(pgConstraint, scanIndexId, useIndex, NULL,
|
||||
scanKeyCount, scanKey);
|
||||
|
||||
heapTuple = systable_getnext(scanDescriptor);
|
||||
while (HeapTupleIsValid(heapTuple))
|
||||
{
|
||||
Form_pg_constraint constraintForm = (Form_pg_constraint) GETSTRUCT(heapTuple);
|
||||
|
||||
if (constraintForm->contype == CONSTRAINT_FOREIGN)
|
||||
{
|
||||
systable_endscan(scanDescriptor);
|
||||
heap_close(pgConstraint, NoLock);
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
heapTuple = systable_getnext(scanDescriptor);
|
||||
}
|
||||
|
||||
systable_endscan(scanDescriptor);
|
||||
heap_close(pgConstraint, NoLock);
|
||||
|
||||
return false;
|
||||
}
|
|
@ -25,18 +25,16 @@
|
|||
#include "catalog/dependency.h"
|
||||
#include "catalog/index.h"
|
||||
#include "catalog/indexing.h"
|
||||
#include "catalog/namespace.h"
|
||||
#include "catalog/pg_attribute.h"
|
||||
#include "catalog/pg_class.h"
|
||||
#include "citus_version.h"
|
||||
#include "catalog/pg_constraint.h"
|
||||
#include "catalog/pg_type.h"
|
||||
#include "commands/dbcommands.h"
|
||||
#include "commands/defrem.h"
|
||||
#include "commands/tablecmds.h"
|
||||
#include "commands/prepare.h"
|
||||
#include "distributed/citus_ruleutils.h"
|
||||
#include "distributed/colocation_utils.h"
|
||||
#include "distributed/foreign_constraint.h"
|
||||
#include "distributed/intermediate_results.h"
|
||||
#include "distributed/maintenanced.h"
|
||||
#include "distributed/master_metadata_utility.h"
|
||||
|
@ -44,7 +42,6 @@
|
|||
#include "distributed/metadata_cache.h"
|
||||
#include "distributed/metadata_sync.h"
|
||||
#include "distributed/multi_copy.h"
|
||||
#include "distributed/multi_join_order.h"
|
||||
#include "distributed/multi_partitioning_utils.h"
|
||||
#include "distributed/distributed_planner.h"
|
||||
#include "distributed/multi_router_executor.h"
|
||||
|
@ -78,15 +75,12 @@
|
|||
#include "utils/builtins.h"
|
||||
#include "utils/elog.h"
|
||||
#include "utils/errcodes.h"
|
||||
#include "utils/fmgroids.h"
|
||||
#include "utils/guc.h"
|
||||
#include "utils/hsearch.h"
|
||||
#include "utils/inval.h"
|
||||
#include "utils/lsyscache.h"
|
||||
#include "utils/memutils.h"
|
||||
#include "utils/palloc.h"
|
||||
#include "utils/rel.h"
|
||||
#include "utils/relcache.h"
|
||||
#include "utils/syscache.h"
|
||||
|
||||
|
||||
|
@ -148,10 +142,6 @@ static void ErrorIfUnsupportedTruncateStmt(TruncateStmt *truncateStatement);
|
|||
static bool OptionsSpecifyOwnedBy(List *optionList, Oid *ownedByTableId);
|
||||
static void ErrorIfUnsupportedRenameStmt(RenameStmt *renameStmt);
|
||||
static void ErrorIfUnsupportedAlterAddConstraintStmt(AlterTableStmt *alterTableStatement);
|
||||
static void ErrorIfUnsupportedForeignConstraint(Relation relation,
|
||||
char distributionMethod,
|
||||
Var *distributionColumn,
|
||||
uint32 colocationId);
|
||||
|
||||
/* Local functions forward declarations for helper functions */
|
||||
static char * ExtractNewExtensionVersion(Node *parsetree);
|
||||
|
@ -2520,238 +2510,6 @@ ErrorIfUnsupportedConstraint(Relation relation, char distributionMethod,
|
|||
}
|
||||
|
||||
|
||||
/*
|
||||
* ErrorIfUnsupportedForeignConstraint runs checks related to foreign constraints and
|
||||
* errors out if it is not possible to create one of the foreign constraint in distributed
|
||||
* environment.
|
||||
*
|
||||
* To support foreign constraints, we require that;
|
||||
* - Referencing and referenced tables are hash distributed.
|
||||
* - Referencing and referenced tables are co-located.
|
||||
* - Foreign constraint is defined over distribution column.
|
||||
* - ON DELETE/UPDATE SET NULL, ON DELETE/UPDATE SET DEFAULT and ON UPDATE CASCADE options
|
||||
* are not used.
|
||||
* - Replication factors of referencing and referenced table are 1.
|
||||
*/
|
||||
static void
|
||||
ErrorIfUnsupportedForeignConstraint(Relation relation, char distributionMethod,
|
||||
Var *distributionColumn, uint32 colocationId)
|
||||
{
|
||||
Relation pgConstraint = NULL;
|
||||
SysScanDesc scanDescriptor = NULL;
|
||||
ScanKeyData scanKey[1];
|
||||
int scanKeyCount = 1;
|
||||
HeapTuple heapTuple = NULL;
|
||||
|
||||
Oid referencingTableId = relation->rd_id;
|
||||
Oid referencedTableId = InvalidOid;
|
||||
uint32 referencedTableColocationId = INVALID_COLOCATION_ID;
|
||||
Var *referencedTablePartitionColumn = NULL;
|
||||
|
||||
Datum referencingColumnsDatum;
|
||||
Datum *referencingColumnArray;
|
||||
int referencingColumnCount = 0;
|
||||
Datum referencedColumnsDatum;
|
||||
Datum *referencedColumnArray;
|
||||
int referencedColumnCount = 0;
|
||||
bool isNull = false;
|
||||
int attrIdx = 0;
|
||||
bool foreignConstraintOnPartitionColumn = false;
|
||||
bool selfReferencingTable = false;
|
||||
|
||||
pgConstraint = heap_open(ConstraintRelationId, AccessShareLock);
|
||||
ScanKeyInit(&scanKey[0], Anum_pg_constraint_conrelid, BTEqualStrategyNumber, F_OIDEQ,
|
||||
relation->rd_id);
|
||||
scanDescriptor = systable_beginscan(pgConstraint, ConstraintRelidIndexId, true, NULL,
|
||||
scanKeyCount, scanKey);
|
||||
|
||||
heapTuple = systable_getnext(scanDescriptor);
|
||||
while (HeapTupleIsValid(heapTuple))
|
||||
{
|
||||
Form_pg_constraint constraintForm = (Form_pg_constraint) GETSTRUCT(heapTuple);
|
||||
bool singleReplicatedTable = true;
|
||||
|
||||
if (constraintForm->contype != CONSTRAINT_FOREIGN)
|
||||
{
|
||||
heapTuple = systable_getnext(scanDescriptor);
|
||||
continue;
|
||||
}
|
||||
|
||||
referencedTableId = constraintForm->confrelid;
|
||||
selfReferencingTable = referencingTableId == referencedTableId;
|
||||
|
||||
/*
|
||||
* We do not support foreign keys for reference tables. Here we skip the second
|
||||
* part of check if the table is a self referencing table because;
|
||||
* - PartitionMethod only works for distributed tables and this table may not be
|
||||
* distributed yet.
|
||||
* - Since referencing and referenced tables are same, it is OK to not checking
|
||||
* distribution method twice.
|
||||
*/
|
||||
if (distributionMethod == DISTRIBUTE_BY_NONE ||
|
||||
(!selfReferencingTable &&
|
||||
PartitionMethod(referencedTableId) == DISTRIBUTE_BY_NONE))
|
||||
{
|
||||
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||
errmsg("cannot create foreign key constraint from or to "
|
||||
"reference tables")));
|
||||
}
|
||||
|
||||
/*
|
||||
* ON DELETE SET NULL and ON DELETE SET DEFAULT is not supported. Because we do
|
||||
* not want to set partition column to NULL or default value.
|
||||
*/
|
||||
if (constraintForm->confdeltype == FKCONSTR_ACTION_SETNULL ||
|
||||
constraintForm->confdeltype == FKCONSTR_ACTION_SETDEFAULT)
|
||||
{
|
||||
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||
errmsg("cannot create foreign key constraint"),
|
||||
errdetail("SET NULL or SET DEFAULT is not supported"
|
||||
" in ON DELETE operation.")));
|
||||
}
|
||||
|
||||
/*
|
||||
* ON UPDATE SET NULL, ON UPDATE SET DEFAULT and UPDATE CASCADE is not supported.
|
||||
* Because we do not want to set partition column to NULL or default value. Also
|
||||
* cascading update operation would require re-partitioning. Updating partition
|
||||
* column value is not allowed anyway even outside of foreign key concept.
|
||||
*/
|
||||
if (constraintForm->confupdtype == FKCONSTR_ACTION_SETNULL ||
|
||||
constraintForm->confupdtype == FKCONSTR_ACTION_SETDEFAULT ||
|
||||
constraintForm->confupdtype == FKCONSTR_ACTION_CASCADE)
|
||||
{
|
||||
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||
errmsg("cannot create foreign key constraint"),
|
||||
errdetail("SET NULL, SET DEFAULT or CASCADE is not"
|
||||
" supported in ON UPDATE operation.")));
|
||||
}
|
||||
|
||||
/*
|
||||
* Some checks are not meaningful if foreign key references the table itself.
|
||||
* Therefore we will skip those checks.
|
||||
*/
|
||||
if (!selfReferencingTable)
|
||||
{
|
||||
if (!IsDistributedTable(referencedTableId))
|
||||
{
|
||||
ereport(ERROR, (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
|
||||
errmsg("cannot create foreign key constraint"),
|
||||
errdetail("Referenced table must be a distributed "
|
||||
"table.")));
|
||||
}
|
||||
|
||||
/* to enforce foreign constraints, tables must be co-located */
|
||||
referencedTableColocationId = TableColocationId(referencedTableId);
|
||||
if (colocationId == INVALID_COLOCATION_ID ||
|
||||
colocationId != referencedTableColocationId)
|
||||
{
|
||||
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||
errmsg("cannot create foreign key constraint"),
|
||||
errdetail("Foreign key constraint can only be created"
|
||||
" on co-located tables.")));
|
||||
}
|
||||
|
||||
/*
|
||||
* Partition column must exist in both referencing and referenced side of the
|
||||
* foreign key constraint. They also must be in same ordinal.
|
||||
*/
|
||||
referencedTablePartitionColumn = DistPartitionKey(referencedTableId);
|
||||
}
|
||||
else
|
||||
{
|
||||
/*
|
||||
* Partition column must exist in both referencing and referenced side of the
|
||||
* foreign key constraint. They also must be in same ordinal.
|
||||
*/
|
||||
referencedTablePartitionColumn = distributionColumn;
|
||||
}
|
||||
|
||||
/*
|
||||
* Column attributes are not available in Form_pg_constraint, therefore we need
|
||||
* to find them in the system catalog. After finding them, we iterate over column
|
||||
* attributes together because partition column must be at the same place in both
|
||||
* referencing and referenced side of the foreign key constraint
|
||||
*/
|
||||
referencingColumnsDatum = SysCacheGetAttr(CONSTROID, heapTuple,
|
||||
Anum_pg_constraint_conkey, &isNull);
|
||||
referencedColumnsDatum = SysCacheGetAttr(CONSTROID, heapTuple,
|
||||
Anum_pg_constraint_confkey, &isNull);
|
||||
|
||||
deconstruct_array(DatumGetArrayTypeP(referencingColumnsDatum), INT2OID, 2, true,
|
||||
's', &referencingColumnArray, NULL, &referencingColumnCount);
|
||||
deconstruct_array(DatumGetArrayTypeP(referencedColumnsDatum), INT2OID, 2, true,
|
||||
's', &referencedColumnArray, NULL, &referencedColumnCount);
|
||||
|
||||
Assert(referencingColumnCount == referencedColumnCount);
|
||||
|
||||
for (attrIdx = 0; attrIdx < referencingColumnCount; ++attrIdx)
|
||||
{
|
||||
AttrNumber referencingAttrNo = DatumGetInt16(referencingColumnArray[attrIdx]);
|
||||
AttrNumber referencedAttrNo = DatumGetInt16(referencedColumnArray[attrIdx]);
|
||||
|
||||
if (distributionColumn->varattno == referencingAttrNo &&
|
||||
referencedTablePartitionColumn->varattno == referencedAttrNo)
|
||||
{
|
||||
foreignConstraintOnPartitionColumn = true;
|
||||
}
|
||||
}
|
||||
|
||||
if (!foreignConstraintOnPartitionColumn)
|
||||
{
|
||||
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||
errmsg("cannot create foreign key constraint"),
|
||||
errdetail("Partition column must exist both "
|
||||
"referencing and referenced side of the "
|
||||
"foreign constraint statement and it must "
|
||||
"be in the same ordinal in both sides.")));
|
||||
}
|
||||
|
||||
/*
|
||||
* We do not allow to create foreign constraints if shard replication factor is
|
||||
* greater than 1. Because in our current design, multiple replicas may cause
|
||||
* locking problems and inconsistent shard contents. We don't check the referenced
|
||||
* table, since referenced and referencing tables should be co-located and
|
||||
* colocation check has been done above.
|
||||
*/
|
||||
if (IsDistributedTable(referencingTableId))
|
||||
{
|
||||
/* check whether ALTER TABLE command is applied over single replicated table */
|
||||
if (!SingleReplicatedTable(referencingTableId))
|
||||
{
|
||||
singleReplicatedTable = false;
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
/* check whether creating single replicated table with foreign constraint */
|
||||
if (ShardReplicationFactor > 1)
|
||||
{
|
||||
singleReplicatedTable = false;
|
||||
}
|
||||
}
|
||||
|
||||
if (!singleReplicatedTable)
|
||||
{
|
||||
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||
errmsg("cannot create foreign key constraint"),
|
||||
errdetail("Citus Community Edition currently supports "
|
||||
"foreign key constraints only for "
|
||||
"\"citus.shard_replication_factor = 1\"."),
|
||||
errhint("Please change \"citus.shard_replication_factor to "
|
||||
"1\". To learn more about using foreign keys with "
|
||||
"other replication factors, please contact us at "
|
||||
"https://citusdata.com/about/contact_us.")));
|
||||
}
|
||||
|
||||
heapTuple = systable_getnext(scanDescriptor);
|
||||
}
|
||||
|
||||
/* clean up scan and close system catalog */
|
||||
systable_endscan(scanDescriptor);
|
||||
heap_close(pgConstraint, AccessShareLock);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* ErrorIfUnsupportedSeqStmt errors out if the provided create sequence
|
||||
* statement specifies a distributed table in its OWNED BY clause.
|
||||
|
|
|
@ -1380,51 +1380,3 @@ TableOwner(Oid relationId)
|
|||
|
||||
return GetUserNameFromId(userId, false);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* TableReferenced function checks whether given table is referenced by another table
|
||||
* via foreign constraints. If it is referenced, this function returns true. To check
|
||||
* that, this function searches given relation at pg_constraints system catalog. However
|
||||
* since there is no index for the column we searched, this function performs sequential
|
||||
* search, therefore call this function with caution.
|
||||
*/
|
||||
bool
|
||||
TableReferenced(Oid relationId)
|
||||
{
|
||||
Relation pgConstraint = NULL;
|
||||
HeapTuple heapTuple = NULL;
|
||||
SysScanDesc scanDescriptor = NULL;
|
||||
ScanKeyData scanKey[1];
|
||||
int scanKeyCount = 1;
|
||||
Oid scanIndexId = InvalidOid;
|
||||
bool useIndex = false;
|
||||
|
||||
pgConstraint = heap_open(ConstraintRelationId, AccessShareLock);
|
||||
|
||||
ScanKeyInit(&scanKey[0], Anum_pg_constraint_confrelid, BTEqualStrategyNumber, F_OIDEQ,
|
||||
relationId);
|
||||
scanDescriptor = systable_beginscan(pgConstraint, scanIndexId, useIndex, NULL,
|
||||
scanKeyCount, scanKey);
|
||||
|
||||
heapTuple = systable_getnext(scanDescriptor);
|
||||
while (HeapTupleIsValid(heapTuple))
|
||||
{
|
||||
Form_pg_constraint constraintForm = (Form_pg_constraint) GETSTRUCT(heapTuple);
|
||||
|
||||
if (constraintForm->contype == CONSTRAINT_FOREIGN)
|
||||
{
|
||||
systable_endscan(scanDescriptor);
|
||||
heap_close(pgConstraint, NoLock);
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
heapTuple = systable_getnext(scanDescriptor);
|
||||
}
|
||||
|
||||
systable_endscan(scanDescriptor);
|
||||
heap_close(pgConstraint, NoLock);
|
||||
|
||||
return false;
|
||||
}
|
||||
|
|
|
@ -770,67 +770,6 @@ GetTableIndexAndConstraintCommands(Oid relationId)
|
|||
}
|
||||
|
||||
|
||||
/*
|
||||
* GetTableForeignConstraints takes in a relationId, and returns the list of foreign
|
||||
* constraint commands needed to reconstruct foreign constraints of that table.
|
||||
*/
|
||||
List *
|
||||
GetTableForeignConstraintCommands(Oid relationId)
|
||||
{
|
||||
List *tableForeignConstraints = NIL;
|
||||
|
||||
Relation pgConstraint = NULL;
|
||||
SysScanDesc scanDescriptor = NULL;
|
||||
ScanKeyData scanKey[1];
|
||||
int scanKeyCount = 1;
|
||||
HeapTuple heapTuple = NULL;
|
||||
|
||||
/*
|
||||
* Set search_path to NIL so that all objects outside of pg_catalog will be
|
||||
* schema-prefixed. pg_catalog will be added automatically when we call
|
||||
* PushOverrideSearchPath(), since we set addCatalog to true;
|
||||
*/
|
||||
OverrideSearchPath *overridePath = GetOverrideSearchPath(CurrentMemoryContext);
|
||||
overridePath->schemas = NIL;
|
||||
overridePath->addCatalog = true;
|
||||
PushOverrideSearchPath(overridePath);
|
||||
|
||||
/* open system catalog and scan all constraints that belong to this table */
|
||||
pgConstraint = heap_open(ConstraintRelationId, AccessShareLock);
|
||||
ScanKeyInit(&scanKey[0], Anum_pg_constraint_conrelid, BTEqualStrategyNumber, F_OIDEQ,
|
||||
relationId);
|
||||
scanDescriptor = systable_beginscan(pgConstraint, ConstraintRelidIndexId, true, NULL,
|
||||
scanKeyCount, scanKey);
|
||||
|
||||
heapTuple = systable_getnext(scanDescriptor);
|
||||
while (HeapTupleIsValid(heapTuple))
|
||||
{
|
||||
Form_pg_constraint constraintForm = (Form_pg_constraint) GETSTRUCT(heapTuple);
|
||||
|
||||
if (constraintForm->contype == CONSTRAINT_FOREIGN)
|
||||
{
|
||||
Oid constraintId = get_relation_constraint_oid(relationId,
|
||||
constraintForm->conname.data,
|
||||
true);
|
||||
char *statementDef = pg_get_constraintdef_command(constraintId);
|
||||
|
||||
tableForeignConstraints = lappend(tableForeignConstraints, statementDef);
|
||||
}
|
||||
|
||||
heapTuple = systable_getnext(scanDescriptor);
|
||||
}
|
||||
|
||||
/* clean up scan and close system catalog */
|
||||
systable_endscan(scanDescriptor);
|
||||
heap_close(pgConstraint, AccessShareLock);
|
||||
|
||||
/* revert back to original search_path */
|
||||
PopOverrideSearchPath();
|
||||
|
||||
return tableForeignConstraints;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* ShardStorageType returns the shard storage type according to relation type.
|
||||
*/
|
||||
|
|
|
@ -20,6 +20,7 @@
|
|||
#include "catalog/pg_class.h"
|
||||
#include "distributed/colocation_utils.h"
|
||||
#include "distributed/connection_management.h"
|
||||
#include "distributed/foreign_constraint.h"
|
||||
#include "distributed/listutils.h"
|
||||
#include "distributed/master_protocol.h"
|
||||
#include "distributed/metadata_cache.h"
|
||||
|
|
|
@ -30,6 +30,7 @@
|
|||
#include "distributed/citus_ruleutils.h"
|
||||
#include "distributed/colocation_utils.h"
|
||||
#include "distributed/connection_management.h"
|
||||
#include "distributed/foreign_constraint.h"
|
||||
#include "distributed/multi_client_executor.h"
|
||||
#include "distributed/master_metadata_utility.h"
|
||||
#include "distributed/master_protocol.h"
|
||||
|
|
|
@ -29,6 +29,7 @@
|
|||
#include "catalog/pg_type.h"
|
||||
#include "distributed/citus_ruleutils.h"
|
||||
#include "distributed/distribution_column.h"
|
||||
#include "distributed/foreign_constraint.h"
|
||||
#include "distributed/listutils.h"
|
||||
#include "distributed/master_metadata_utility.h"
|
||||
#include "distributed/master_protocol.h"
|
||||
|
|
|
@ -16,6 +16,7 @@
|
|||
#include "access/htup_details.h"
|
||||
#include "access/genam.h"
|
||||
#include "distributed/colocation_utils.h"
|
||||
#include "distributed/foreign_constraint.h"
|
||||
#include "distributed/listutils.h"
|
||||
#include "distributed/master_protocol.h"
|
||||
#include "distributed/master_metadata_utility.h"
|
||||
|
|
|
@ -0,0 +1,24 @@
|
|||
/*-------------------------------------------------------------------------
|
||||
* foreign_constraint.h
|
||||
*
|
||||
* Copyright (c) 2016, Citus Data, Inc.
|
||||
*
|
||||
*-------------------------------------------------------------------------
|
||||
*/
|
||||
|
||||
#ifndef FOREIGN_CONSTRAINT_H
|
||||
#define FOREIGN_CONSTRAINT_H
|
||||
|
||||
#include "postgres.h"
|
||||
#include "postgres_ext.h"
|
||||
#include "utils/relcache.h"
|
||||
#include "nodes/primnodes.h"
|
||||
|
||||
extern void ErrorIfUnsupportedForeignConstraint(Relation relation, char
|
||||
distributionMethod,
|
||||
Var *distributionColumn, uint32
|
||||
colocationId);
|
||||
extern List * GetTableForeignConstraintCommands(Oid relationId);
|
||||
extern bool TableReferenced(Oid relationId);
|
||||
|
||||
#endif
|
|
@ -158,7 +158,6 @@ extern void EnsureTableOwner(Oid relationId);
|
|||
extern void EnsureSuperUser(void);
|
||||
extern void EnsureReplicationSettings(Oid relationId, char replicationModel);
|
||||
extern bool RegularTable(Oid relationId);
|
||||
extern bool TableReferenced(Oid relationId);
|
||||
extern char * ConstructQualifiedShardName(ShardInterval *shardInterval);
|
||||
extern Datum StringToDatum(char *inputString, Oid dataType);
|
||||
extern char * DatumToString(Datum datum, Oid dataType);
|
||||
|
|
|
@ -107,7 +107,6 @@ extern Oid ResolveRelationId(text *relationName);
|
|||
extern List * GetTableDDLEvents(Oid relationId, bool forShardCreation);
|
||||
extern List * GetTableCreationCommands(Oid relationId, bool forShardCreation);
|
||||
extern List * GetTableIndexAndConstraintCommands(Oid relationId);
|
||||
extern List * GetTableForeignConstraintCommands(Oid relationId);
|
||||
extern char ShardStorageType(Oid relationId);
|
||||
extern void CheckDistributedTable(Oid relationId);
|
||||
extern void CreateAppendDistributedShardPlacements(Oid relationId, int64 shardId,
|
||||
|
|
Loading…
Reference in New Issue