mirror of https://github.com/citusdata/citus.git
Separate controls
Separte controls to make checks before and after query running on masterpull/1288/head
parent
4390bc09c3
commit
697084b532
|
@ -35,7 +35,6 @@
|
|||
#include "commands/extension.h"
|
||||
#include "commands/trigger.h"
|
||||
#include "distributed/colocation_utils.h"
|
||||
#include "distributed/create_distributed_table.h"
|
||||
#include "distributed/distribution_column.h"
|
||||
#include "distributed/master_metadata_utility.h"
|
||||
#include "distributed/master_protocol.h"
|
||||
|
@ -43,6 +42,7 @@
|
|||
#include "distributed/metadata_sync.h"
|
||||
#include "distributed/multi_copy.h"
|
||||
#include "distributed/multi_logical_planner.h"
|
||||
#include "distributed/multi_utility.h"
|
||||
#include "distributed/pg_dist_colocation.h"
|
||||
#include "distributed/pg_dist_partition.h"
|
||||
#include "distributed/reference_table_utils.h"
|
||||
|
@ -82,10 +82,6 @@ static char LookupDistributionMethod(Oid distributionMethodOid);
|
|||
static Oid SupportFunctionForColumn(Var *partitionColumn, Oid accessMethodId,
|
||||
int16 supportFunctionNumber);
|
||||
static bool LocalTableEmpty(Oid tableId);
|
||||
static void ErrorIfNotSupportedForeignConstraint(Relation relation,
|
||||
char distributionMethod,
|
||||
Var *distributionColumn,
|
||||
uint32 colocationId);
|
||||
static void CreateHashDistributedTable(Oid relationId, char *distributionColumnName,
|
||||
char *colocateWithTableName,
|
||||
int shardCount, int replicationFactor);
|
||||
|
@ -419,348 +415,6 @@ ConvertToDistributedTable(Oid relationId, char *distributionColumnName,
|
|||
}
|
||||
|
||||
|
||||
/*
|
||||
* ErrorIfNotSupportedConstraint run checks related to unique index / exclude
|
||||
* constraints.
|
||||
*
|
||||
* The function skips the uniqeness checks for reference tables (i.e., distribution
|
||||
* method is 'none').
|
||||
*
|
||||
* Forbid UNIQUE, PRIMARY KEY, or EXCLUDE constraints on append partitioned
|
||||
* tables, since currently there is no way of enforcing uniqueness for
|
||||
* overlapping shards.
|
||||
*
|
||||
* Similarly, do not allow such constraints if they do not include partition
|
||||
* column. This check is important for two reasons:
|
||||
* i. First, currently Citus does not enforce uniqueness constraint on multiple
|
||||
* shards.
|
||||
* ii. Second, INSERT INTO .. ON CONFLICT (i.e., UPSERT) queries can be executed
|
||||
* with no further check for constraints.
|
||||
*/
|
||||
void
|
||||
ErrorIfNotSupportedConstraint(Relation relation, char distributionMethod,
|
||||
Var *distributionColumn, uint32 colocationId)
|
||||
{
|
||||
char *relationName = NULL;
|
||||
List *indexOidList = NULL;
|
||||
ListCell *indexOidCell = NULL;
|
||||
|
||||
/*
|
||||
* We first perform check for foreign constraints. It is important to do this check
|
||||
* before next check, because other types of constraints are allowed on reference
|
||||
* tables and we return early for those constraints thanks to next check. Therefore,
|
||||
* for reference tables, we first check for foreing constraints and if they are OK,
|
||||
* we do not error out for other types of constraints.
|
||||
*/
|
||||
ErrorIfNotSupportedForeignConstraint(relation, distributionMethod, distributionColumn,
|
||||
colocationId);
|
||||
|
||||
/*
|
||||
* Citus supports any kind of uniqueness constraints for reference tables
|
||||
* given that they only consist of a single shard and we can simply rely on
|
||||
* Postgres.
|
||||
*/
|
||||
if (distributionMethod == DISTRIBUTE_BY_NONE)
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
relationName = RelationGetRelationName(relation);
|
||||
indexOidList = RelationGetIndexList(relation);
|
||||
|
||||
foreach(indexOidCell, indexOidList)
|
||||
{
|
||||
Oid indexOid = lfirst_oid(indexOidCell);
|
||||
Relation indexDesc = index_open(indexOid, RowExclusiveLock);
|
||||
IndexInfo *indexInfo = NULL;
|
||||
AttrNumber *attributeNumberArray = NULL;
|
||||
bool hasDistributionColumn = false;
|
||||
int attributeCount = 0;
|
||||
int attributeIndex = 0;
|
||||
|
||||
/* extract index key information from the index's pg_index info */
|
||||
indexInfo = BuildIndexInfo(indexDesc);
|
||||
|
||||
/* only check unique indexes and exclusion constraints. */
|
||||
if (indexInfo->ii_Unique == false && indexInfo->ii_ExclusionOps == NULL)
|
||||
{
|
||||
index_close(indexDesc, NoLock);
|
||||
continue;
|
||||
}
|
||||
|
||||
/*
|
||||
* Citus cannot enforce uniqueness/exclusion constraints with overlapping shards.
|
||||
* Thus, emit a warning for unique indexes and exclusion constraints on
|
||||
* append partitioned tables.
|
||||
*/
|
||||
if (distributionMethod == DISTRIBUTE_BY_APPEND)
|
||||
{
|
||||
ereport(WARNING, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||
errmsg("table \"%s\" has a UNIQUE or EXCLUDE constraint",
|
||||
relationName),
|
||||
errdetail("UNIQUE constraints, EXCLUDE constraints, "
|
||||
"and PRIMARY KEYs on "
|
||||
"append-partitioned tables cannot be enforced."),
|
||||
errhint("Consider using hash partitioning.")));
|
||||
}
|
||||
|
||||
attributeCount = indexInfo->ii_NumIndexAttrs;
|
||||
attributeNumberArray = indexInfo->ii_KeyAttrNumbers;
|
||||
|
||||
for (attributeIndex = 0; attributeIndex < attributeCount; attributeIndex++)
|
||||
{
|
||||
AttrNumber attributeNumber = attributeNumberArray[attributeIndex];
|
||||
bool uniqueConstraint = false;
|
||||
bool exclusionConstraintWithEquality = false;
|
||||
|
||||
if (distributionColumn->varattno != attributeNumber)
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
uniqueConstraint = indexInfo->ii_Unique;
|
||||
exclusionConstraintWithEquality = (indexInfo->ii_ExclusionOps != NULL &&
|
||||
OperatorImplementsEquality(
|
||||
indexInfo->ii_ExclusionOps[
|
||||
attributeIndex]));
|
||||
|
||||
if (uniqueConstraint || exclusionConstraintWithEquality)
|
||||
{
|
||||
hasDistributionColumn = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (!hasDistributionColumn)
|
||||
{
|
||||
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||
errmsg("cannot distribute relation: \"%s\"",
|
||||
relationName),
|
||||
errdetail("Distributed relations cannot have UNIQUE, "
|
||||
"EXCLUDE, or PRIMARY KEY constraints that do not "
|
||||
"include the partition column (with an equality "
|
||||
"operator if EXCLUDE).")));
|
||||
}
|
||||
|
||||
index_close(indexDesc, NoLock);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* ErrorIfNotSupportedForeignConstraint runs checks related to foreign constraints and
|
||||
* errors out if it is not possible to create one of the foreign constraint in distributed
|
||||
* environment.
|
||||
*
|
||||
* To support foreign constraints, we require that;
|
||||
* - Referencing and referenced tables are hash distributed.
|
||||
* - Referencing and referenced tables are co-located.
|
||||
* - Foreign constraint is defined over distribution column.
|
||||
* - ON DELETE/UPDATE SET NULL, ON DELETE/UPDATE SET DEFAULT and ON UPDATE CASCADE options
|
||||
* are not used.
|
||||
* - Replication factors of referencing and referenced table are 1.
|
||||
*/
|
||||
static void
|
||||
ErrorIfNotSupportedForeignConstraint(Relation relation, char distributionMethod,
|
||||
Var *distributionColumn, uint32 colocationId)
|
||||
{
|
||||
Relation pgConstraint = NULL;
|
||||
SysScanDesc scanDescriptor = NULL;
|
||||
ScanKeyData scanKey[1];
|
||||
int scanKeyCount = 1;
|
||||
HeapTuple heapTuple = NULL;
|
||||
|
||||
Oid referencedTableId = InvalidOid;
|
||||
uint32 referencedTableColocationId = INVALID_COLOCATION_ID;
|
||||
Var *referencedTablePartitionColumn = NULL;
|
||||
|
||||
Datum referencingColumnsDatum;
|
||||
Datum *referencingColumnArray;
|
||||
int referencingColumnCount = 0;
|
||||
Datum referencedColumnsDatum;
|
||||
Datum *referencedColumnArray;
|
||||
int referencedColumnCount = 0;
|
||||
bool isNull = false;
|
||||
int attrIdx = 0;
|
||||
bool foreignConstraintOnPartitionColumn = false;
|
||||
bool selfReferencingTable = false;
|
||||
|
||||
pgConstraint = heap_open(ConstraintRelationId, AccessShareLock);
|
||||
ScanKeyInit(&scanKey[0], Anum_pg_constraint_conrelid, BTEqualStrategyNumber, F_OIDEQ,
|
||||
relation->rd_id);
|
||||
scanDescriptor = systable_beginscan(pgConstraint, ConstraintRelidIndexId, true, NULL,
|
||||
scanKeyCount, scanKey);
|
||||
|
||||
heapTuple = systable_getnext(scanDescriptor);
|
||||
while (HeapTupleIsValid(heapTuple))
|
||||
{
|
||||
Form_pg_constraint constraintForm = (Form_pg_constraint) GETSTRUCT(heapTuple);
|
||||
|
||||
if (constraintForm->contype != CONSTRAINT_FOREIGN)
|
||||
{
|
||||
heapTuple = systable_getnext(scanDescriptor);
|
||||
continue;
|
||||
}
|
||||
|
||||
referencedTableId = constraintForm->confrelid;
|
||||
selfReferencingTable = relation->rd_id == referencedTableId;
|
||||
|
||||
/*
|
||||
* We do not support foreign keys for reference tables. Here we skip the second
|
||||
* part of check if the table is a self referencing table because;
|
||||
* - PartitionMethod only works for distributed tables and this table is not
|
||||
* distributed yet.
|
||||
* - Since referencing and referenced tables are same, it is OK to not checking
|
||||
* distribution method twice.
|
||||
*/
|
||||
if (distributionMethod == DISTRIBUTE_BY_NONE ||
|
||||
(!selfReferencingTable &&
|
||||
PartitionMethod(referencedTableId) == DISTRIBUTE_BY_NONE))
|
||||
{
|
||||
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||
errmsg("cannot create foreign key constraint"),
|
||||
errdetail("Foreign key constraints are not allowed from or "
|
||||
"to reference tables.")));
|
||||
}
|
||||
|
||||
/*
|
||||
* ON DELETE SET NULL and ON DELETE SET DEFAULT is not supported. Because we do
|
||||
* not want to set partition column to NULL or default value.
|
||||
*/
|
||||
if (constraintForm->confdeltype == FKCONSTR_ACTION_SETNULL ||
|
||||
constraintForm->confdeltype == FKCONSTR_ACTION_SETDEFAULT)
|
||||
{
|
||||
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||
errmsg("cannot create foreign key constraint"),
|
||||
errdetail("SET NULL or SET DEFAULT is not supported"
|
||||
" in ON DELETE operation.")));
|
||||
}
|
||||
|
||||
/*
|
||||
* ON UPDATE SET NULL, ON UPDATE SET DEFAULT and UPDATE CASCADE is not supported.
|
||||
* Because we do not want to set partition column to NULL or default value. Also
|
||||
* cascading update operation would require re-partitioning. Updating partition
|
||||
* column value is not allowed anyway even outside of foreign key concept.
|
||||
*/
|
||||
if (constraintForm->confupdtype == FKCONSTR_ACTION_SETNULL ||
|
||||
constraintForm->confupdtype == FKCONSTR_ACTION_SETDEFAULT ||
|
||||
constraintForm->confupdtype == FKCONSTR_ACTION_CASCADE)
|
||||
{
|
||||
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||
errmsg("cannot create foreign key constraint"),
|
||||
errdetail("SET NULL, SET DEFAULT or CASCADE is not"
|
||||
" supported in ON UPDATE operation.")));
|
||||
}
|
||||
|
||||
/*
|
||||
* Some checks are not meaningful if foreign key references the table itself.
|
||||
* Therefore we will skip those checks.
|
||||
*/
|
||||
if (!selfReferencingTable)
|
||||
{
|
||||
if (!IsDistributedTable(referencedTableId))
|
||||
{
|
||||
ereport(ERROR, (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
|
||||
errmsg("cannot create foreign key constraint"),
|
||||
errdetail("Referenced table must be a distributed "
|
||||
"table.")));
|
||||
}
|
||||
|
||||
/* to enforce foreign constraints, tables must be co-located */
|
||||
referencedTableColocationId = TableColocationId(referencedTableId);
|
||||
if (relation->rd_id != referencedTableId &&
|
||||
(colocationId == INVALID_COLOCATION_ID ||
|
||||
colocationId != referencedTableColocationId))
|
||||
{
|
||||
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||
errmsg("cannot create foreign key constraint"),
|
||||
errdetail("Foreign key constraint can only be created"
|
||||
" on co-located tables.")));
|
||||
}
|
||||
|
||||
/*
|
||||
* Partition column must exist in both referencing and referenced side of the
|
||||
* foreign key constraint. They also must be in same ordinal.
|
||||
*/
|
||||
referencedTablePartitionColumn = PartitionKey(referencedTableId);
|
||||
}
|
||||
else
|
||||
{
|
||||
/*
|
||||
* Partition column must exist in both referencing and referenced side of the
|
||||
* foreign key constraint. They also must be in same ordinal.
|
||||
*/
|
||||
referencedTablePartitionColumn = distributionColumn;
|
||||
}
|
||||
|
||||
/*
|
||||
* Column attributes are not available in Form_pg_constraint, therefore we need
|
||||
* to find them in the system catalog. After finding them, we iterate over column
|
||||
* attributes together because partition column must be at the same place in both
|
||||
* referencing and referenced side of the foreign key constraint
|
||||
*/
|
||||
referencingColumnsDatum = SysCacheGetAttr(CONSTROID, heapTuple,
|
||||
Anum_pg_constraint_conkey, &isNull);
|
||||
referencedColumnsDatum = SysCacheGetAttr(CONSTROID, heapTuple,
|
||||
Anum_pg_constraint_confkey, &isNull);
|
||||
|
||||
deconstruct_array(DatumGetArrayTypeP(referencingColumnsDatum), INT2OID, 2, true,
|
||||
's', &referencingColumnArray, NULL, &referencingColumnCount);
|
||||
deconstruct_array(DatumGetArrayTypeP(referencedColumnsDatum), INT2OID, 2, true,
|
||||
's', &referencedColumnArray, NULL, &referencedColumnCount);
|
||||
|
||||
Assert(referencingColumnCount == referencedColumnCount);
|
||||
|
||||
for (attrIdx = 0; attrIdx < referencingColumnCount; ++attrIdx)
|
||||
{
|
||||
AttrNumber referencingAttrNo = DatumGetInt16(referencingColumnArray[attrIdx]);
|
||||
AttrNumber referencedAttrNo = DatumGetInt16(referencedColumnArray[attrIdx]);
|
||||
|
||||
if (distributionColumn->varattno == referencingAttrNo &&
|
||||
referencedTablePartitionColumn->varattno == referencedAttrNo)
|
||||
{
|
||||
foreignConstraintOnPartitionColumn = true;
|
||||
}
|
||||
}
|
||||
|
||||
if (!foreignConstraintOnPartitionColumn)
|
||||
{
|
||||
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||
errmsg("cannot create foreign key constraint"),
|
||||
errdetail("Partition column must exist both "
|
||||
"referencing and referenced side of the "
|
||||
"foreign constraint statement and it must "
|
||||
"be in the same ordinal in both sides.")));
|
||||
}
|
||||
|
||||
/*
|
||||
* We do not allow to create foreign constraints if shard replication factor is
|
||||
* greater than 1. Because in our current design, multiple replicas may cause
|
||||
* locking problems and inconsistent shard contents.
|
||||
*/
|
||||
if (ShardReplicationFactor > 1 || (referencedTableId != relation->rd_id &&
|
||||
!SingleReplicatedTable(referencedTableId)))
|
||||
{
|
||||
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||
errmsg("cannot create foreign key constraint"),
|
||||
errdetail("Citus Community Edition currently supports "
|
||||
"foreign key constraints only for "
|
||||
"\"citus.shard_replication_factor = 1\"."),
|
||||
errhint("Please change \"citus.shard_replication_factor to "
|
||||
"1\". To learn more about using foreign keys with "
|
||||
"other replication factors, please contact us at "
|
||||
"https://citusdata.com/about/contact_us.")));
|
||||
}
|
||||
|
||||
heapTuple = systable_getnext(scanDescriptor);
|
||||
}
|
||||
|
||||
/* clean up scan and close system catalog */
|
||||
systable_endscan(scanDescriptor);
|
||||
heap_close(pgConstraint, AccessShareLock);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* LookupDistributionMethod maps the oids of citus.distribution_type enum
|
||||
* values to pg_dist_partition.partmethod values.
|
||||
|
|
|
@ -24,15 +24,20 @@
|
|||
#include "catalog/catalog.h"
|
||||
#include "catalog/dependency.h"
|
||||
#include "catalog/index.h"
|
||||
#include "catalog/indexing.h"
|
||||
#include "catalog/namespace.h"
|
||||
#include "catalog/pg_attribute.h"
|
||||
#include "catalog/pg_class.h"
|
||||
#include "catalog/pg_constraint.h"
|
||||
#if (PG_VERSION_NUM >= 90600)
|
||||
#include "catalog/pg_constraint_fn.h"
|
||||
#endif
|
||||
#include "catalog/pg_type.h"
|
||||
#include "commands/defrem.h"
|
||||
#include "commands/tablecmds.h"
|
||||
#include "commands/prepare.h"
|
||||
#include "distributed/citus_ruleutils.h"
|
||||
#include "distributed/colocation_utils.h"
|
||||
#include "distributed/create_distributed_table.h"
|
||||
#include "distributed/master_metadata_utility.h"
|
||||
#include "distributed/master_protocol.h"
|
||||
#include "distributed/metadata_cache.h"
|
||||
|
@ -69,6 +74,7 @@
|
|||
#include "utils/builtins.h"
|
||||
#include "utils/elog.h"
|
||||
#include "utils/errcodes.h"
|
||||
#include "utils/fmgroids.h"
|
||||
#include "utils/guc.h"
|
||||
#include "utils/hsearch.h"
|
||||
#include "utils/inval.h"
|
||||
|
@ -1364,40 +1370,6 @@ ErrorIfUnsupportedDropIndexStmt(DropStmt *dropIndexStatement)
|
|||
}
|
||||
|
||||
|
||||
static void
|
||||
ErrorIfUnsupportedAlterAddConstraintStmt(AlterTableStmt *alterTableStatement)
|
||||
{
|
||||
Oid relationId = InvalidOid;
|
||||
bool isDistributedRelation = false;
|
||||
LOCKMODE lockmode = 0;
|
||||
|
||||
Relation relation = NULL;
|
||||
char distributionMethod;
|
||||
Var *distributionColumn = NULL;
|
||||
uint32 colocationId = 0;
|
||||
|
||||
/* continue if it is not distributed table */
|
||||
lockmode = AlterTableGetLockLevel(alterTableStatement->cmds);
|
||||
relationId = AlterTableLookupRelation(alterTableStatement, lockmode);
|
||||
|
||||
isDistributedRelation = IsDistributedTable(relationId);
|
||||
if (!isDistributedRelation)
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
|
||||
distributionMethod = PartitionMethod(relationId);
|
||||
distributionColumn = PartitionKey(relationId);
|
||||
colocationId = TableColocationId(relationId);
|
||||
|
||||
relation = relation_open(relationId, ExclusiveLock);
|
||||
ErrorIfNotSupportedConstraint(relation, distributionMethod,
|
||||
distributionColumn, colocationId);
|
||||
relation_close(relation, NoLock);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* ErrorIfUnsupportedAlterTableStmt checks if the corresponding alter table statement
|
||||
* is supported for distributed tables and errors out if it is not. Currently,
|
||||
|
@ -1487,7 +1459,7 @@ ErrorIfUnsupportedAlterTableStmt(AlterTableStmt *alterTableStatement)
|
|||
/* reference tables do not have partition column, so allow them */
|
||||
if (partitionColumn != NULL &&
|
||||
targetAttr->attnum == partitionColumn->varattno)
|
||||
{ /* TODO : CHANGE IT ! */
|
||||
{
|
||||
ereport(ERROR, (errmsg("cannot execute ALTER TABLE command "
|
||||
"involving partition column")));
|
||||
}
|
||||
|
@ -1547,6 +1519,381 @@ ErrorIfUnsupportedAlterTableStmt(AlterTableStmt *alterTableStatement)
|
|||
}
|
||||
|
||||
|
||||
/*
|
||||
* ErrorIfNotSupportedConstraint run checks related to unique index / exclude
|
||||
* constraints.
|
||||
*
|
||||
* The function skips the uniqeness checks for reference tables (i.e., distribution
|
||||
* method is 'none').
|
||||
*
|
||||
* Forbid UNIQUE, PRIMARY KEY, or EXCLUDE constraints on append partitioned
|
||||
* tables, since currently there is no way of enforcing uniqueness for
|
||||
* overlapping shards.
|
||||
*
|
||||
* Similarly, do not allow such constraints if they do not include partition
|
||||
* column. This check is important for two reasons:
|
||||
* i. First, currently Citus does not enforce uniqueness constraint on multiple
|
||||
* shards.
|
||||
* ii. Second, INSERT INTO .. ON CONFLICT (i.e., UPSERT) queries can be executed
|
||||
* with no further check for constraints.
|
||||
*/
|
||||
void
|
||||
ErrorIfNotSupportedConstraint(Relation relation, char distributionMethod,
|
||||
Var *distributionColumn, uint32 colocationId)
|
||||
{
|
||||
char *relationName = NULL;
|
||||
List *indexOidList = NULL;
|
||||
ListCell *indexOidCell = NULL;
|
||||
|
||||
/*
|
||||
* We first perform check for foreign constraints. It is important to do this check
|
||||
* before next check, because other types of constraints are allowed on reference
|
||||
* tables and we return early for those constraints thanks to next check. Therefore,
|
||||
* for reference tables, we first check for foreing constraints and if they are OK,
|
||||
* we do not error out for other types of constraints.
|
||||
*/
|
||||
ErrorIfNotSupportedForeignConstraint(relation, distributionMethod, distributionColumn,
|
||||
colocationId);
|
||||
|
||||
/*
|
||||
* Citus supports any kind of uniqueness constraints for reference tables
|
||||
* given that they only consist of a single shard and we can simply rely on
|
||||
* Postgres.
|
||||
*/
|
||||
if (distributionMethod == DISTRIBUTE_BY_NONE)
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
relationName = RelationGetRelationName(relation);
|
||||
indexOidList = RelationGetIndexList(relation);
|
||||
|
||||
foreach(indexOidCell, indexOidList)
|
||||
{
|
||||
Oid indexOid = lfirst_oid(indexOidCell);
|
||||
Relation indexDesc = index_open(indexOid, RowExclusiveLock);
|
||||
IndexInfo *indexInfo = NULL;
|
||||
AttrNumber *attributeNumberArray = NULL;
|
||||
bool hasDistributionColumn = false;
|
||||
int attributeCount = 0;
|
||||
int attributeIndex = 0;
|
||||
|
||||
/* extract index key information from the index's pg_index info */
|
||||
indexInfo = BuildIndexInfo(indexDesc);
|
||||
|
||||
/* only check unique indexes and exclusion constraints. */
|
||||
if (indexInfo->ii_Unique == false && indexInfo->ii_ExclusionOps == NULL)
|
||||
{
|
||||
index_close(indexDesc, NoLock);
|
||||
continue;
|
||||
}
|
||||
|
||||
/*
|
||||
* Citus cannot enforce uniqueness/exclusion constraints with overlapping shards.
|
||||
* Thus, emit a warning for unique indexes and exclusion constraints on
|
||||
* append partitioned tables.
|
||||
*/
|
||||
if (distributionMethod == DISTRIBUTE_BY_APPEND)
|
||||
{
|
||||
ereport(WARNING, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||
errmsg("table \"%s\" has a UNIQUE or EXCLUDE constraint",
|
||||
relationName),
|
||||
errdetail("UNIQUE constraints, EXCLUDE constraints, "
|
||||
"and PRIMARY KEYs on "
|
||||
"append-partitioned tables cannot be enforced."),
|
||||
errhint("Consider using hash partitioning.")));
|
||||
}
|
||||
|
||||
attributeCount = indexInfo->ii_NumIndexAttrs;
|
||||
attributeNumberArray = indexInfo->ii_KeyAttrNumbers;
|
||||
|
||||
for (attributeIndex = 0; attributeIndex < attributeCount; attributeIndex++)
|
||||
{
|
||||
AttrNumber attributeNumber = attributeNumberArray[attributeIndex];
|
||||
bool uniqueConstraint = false;
|
||||
bool exclusionConstraintWithEquality = false;
|
||||
|
||||
if (distributionColumn->varattno != attributeNumber)
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
uniqueConstraint = indexInfo->ii_Unique;
|
||||
exclusionConstraintWithEquality = (indexInfo->ii_ExclusionOps != NULL &&
|
||||
OperatorImplementsEquality(
|
||||
indexInfo->ii_ExclusionOps[
|
||||
attributeIndex]));
|
||||
|
||||
if (uniqueConstraint || exclusionConstraintWithEquality)
|
||||
{
|
||||
hasDistributionColumn = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (!hasDistributionColumn)
|
||||
{
|
||||
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||
errmsg("cannot distribute relation: \"%s\"",
|
||||
relationName),
|
||||
errdetail("Distributed relations cannot have UNIQUE, "
|
||||
"EXCLUDE, or PRIMARY KEY constraints that do not "
|
||||
"include the partition column (with an equality "
|
||||
"operator if EXCLUDE).")));
|
||||
}
|
||||
|
||||
index_close(indexDesc, NoLock);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* ErrorIfNotSupportedForeignConstraint runs checks related to foreign constraints and
|
||||
* errors out if it is not possible to create one of the foreign constraint in distributed
|
||||
* environment.
|
||||
*
|
||||
* To support foreign constraints, we require that;
|
||||
* - Referencing and referenced tables are hash distributed.
|
||||
* - Referencing and referenced tables are co-located.
|
||||
* - Foreign constraint is defined over distribution column.
|
||||
* - ON DELETE/UPDATE SET NULL, ON DELETE/UPDATE SET DEFAULT and ON UPDATE CASCADE options
|
||||
* are not used.
|
||||
* - Replication factors of referencing and referenced table are 1.
|
||||
*/
|
||||
void
|
||||
ErrorIfNotSupportedForeignConstraint(Relation relation, char distributionMethod,
|
||||
Var *distributionColumn, uint32 colocationId)
|
||||
{
|
||||
Relation pgConstraint = NULL;
|
||||
SysScanDesc scanDescriptor = NULL;
|
||||
ScanKeyData scanKey[1];
|
||||
int scanKeyCount = 1;
|
||||
HeapTuple heapTuple = NULL;
|
||||
|
||||
Oid referencedTableId = InvalidOid;
|
||||
uint32 referencedTableColocationId = INVALID_COLOCATION_ID;
|
||||
Var *referencedTablePartitionColumn = NULL;
|
||||
|
||||
Datum referencingColumnsDatum;
|
||||
Datum *referencingColumnArray;
|
||||
int referencingColumnCount = 0;
|
||||
Datum referencedColumnsDatum;
|
||||
Datum *referencedColumnArray;
|
||||
int referencedColumnCount = 0;
|
||||
bool isNull = false;
|
||||
int attrIdx = 0;
|
||||
bool foreignConstraintOnPartitionColumn = false;
|
||||
bool selfReferencingTable = false;
|
||||
|
||||
pgConstraint = heap_open(ConstraintRelationId, AccessShareLock);
|
||||
ScanKeyInit(&scanKey[0], Anum_pg_constraint_conrelid, BTEqualStrategyNumber, F_OIDEQ,
|
||||
relation->rd_id);
|
||||
scanDescriptor = systable_beginscan(pgConstraint, ConstraintRelidIndexId, true, NULL,
|
||||
scanKeyCount, scanKey);
|
||||
|
||||
heapTuple = systable_getnext(scanDescriptor);
|
||||
while (HeapTupleIsValid(heapTuple))
|
||||
{
|
||||
Form_pg_constraint constraintForm = (Form_pg_constraint) GETSTRUCT(heapTuple);
|
||||
|
||||
if (constraintForm->contype != CONSTRAINT_FOREIGN)
|
||||
{
|
||||
heapTuple = systable_getnext(scanDescriptor);
|
||||
continue;
|
||||
}
|
||||
|
||||
referencedTableId = constraintForm->confrelid;
|
||||
selfReferencingTable = relation->rd_id == referencedTableId;
|
||||
|
||||
/*
|
||||
* We do not support foreign keys for reference tables. Here we skip the second
|
||||
* part of check if the table is a self referencing table because;
|
||||
* - PartitionMethod only works for distributed tables and this table is not
|
||||
* distributed yet.
|
||||
* - Since referencing and referenced tables are same, it is OK to not checking
|
||||
* distribution method twice.
|
||||
*/
|
||||
if (distributionMethod == DISTRIBUTE_BY_NONE ||
|
||||
(!selfReferencingTable &&
|
||||
PartitionMethod(referencedTableId) == DISTRIBUTE_BY_NONE))
|
||||
{
|
||||
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||
errmsg("cannot create foreign key constraint"),
|
||||
errdetail("Foreign key constraints are not allowed from or "
|
||||
"to reference tables.")));
|
||||
}
|
||||
|
||||
/*
|
||||
* ON DELETE SET NULL and ON DELETE SET DEFAULT is not supported. Because we do
|
||||
* not want to set partition column to NULL or default value.
|
||||
*/
|
||||
if (constraintForm->confdeltype == FKCONSTR_ACTION_SETNULL ||
|
||||
constraintForm->confdeltype == FKCONSTR_ACTION_SETDEFAULT)
|
||||
{
|
||||
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||
errmsg("cannot create foreign key constraint"),
|
||||
errdetail("SET NULL or SET DEFAULT is not supported"
|
||||
" in ON DELETE operation.")));
|
||||
}
|
||||
|
||||
/*
|
||||
* ON UPDATE SET NULL, ON UPDATE SET DEFAULT and UPDATE CASCADE is not supported.
|
||||
* Because we do not want to set partition column to NULL or default value. Also
|
||||
* cascading update operation would require re-partitioning. Updating partition
|
||||
* column value is not allowed anyway even outside of foreign key concept.
|
||||
*/
|
||||
if (constraintForm->confupdtype == FKCONSTR_ACTION_SETNULL ||
|
||||
constraintForm->confupdtype == FKCONSTR_ACTION_SETDEFAULT ||
|
||||
constraintForm->confupdtype == FKCONSTR_ACTION_CASCADE)
|
||||
{
|
||||
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||
errmsg("cannot create foreign key constraint"),
|
||||
errdetail("SET NULL, SET DEFAULT or CASCADE is not"
|
||||
" supported in ON UPDATE operation.")));
|
||||
}
|
||||
|
||||
/*
|
||||
* Some checks are not meaningful if foreign key references the table itself.
|
||||
* Therefore we will skip those checks.
|
||||
*/
|
||||
if (!selfReferencingTable)
|
||||
{
|
||||
if (!IsDistributedTable(referencedTableId))
|
||||
{
|
||||
ereport(ERROR, (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
|
||||
errmsg("cannot create foreign key constraint"),
|
||||
errdetail("Referenced table must be a distributed "
|
||||
"table.")));
|
||||
}
|
||||
|
||||
/* to enforce foreign constraints, tables must be co-located */
|
||||
referencedTableColocationId = TableColocationId(referencedTableId);
|
||||
if (relation->rd_id != referencedTableId &&
|
||||
(colocationId == INVALID_COLOCATION_ID ||
|
||||
colocationId != referencedTableColocationId))
|
||||
{
|
||||
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||
errmsg("cannot create foreign key constraint"),
|
||||
errdetail("Foreign key constraint can only be created"
|
||||
" on co-located tables.")));
|
||||
}
|
||||
|
||||
/*
|
||||
* Partition column must exist in both referencing and referenced side of the
|
||||
* foreign key constraint. They also must be in same ordinal.
|
||||
*/
|
||||
referencedTablePartitionColumn = PartitionKey(referencedTableId);
|
||||
}
|
||||
else
|
||||
{
|
||||
/*
|
||||
* Partition column must exist in both referencing and referenced side of the
|
||||
* foreign key constraint. They also must be in same ordinal.
|
||||
*/
|
||||
referencedTablePartitionColumn = distributionColumn;
|
||||
}
|
||||
|
||||
/*
|
||||
* Column attributes are not available in Form_pg_constraint, therefore we need
|
||||
* to find them in the system catalog. After finding them, we iterate over column
|
||||
* attributes together because partition column must be at the same place in both
|
||||
* referencing and referenced side of the foreign key constraint
|
||||
*/
|
||||
referencingColumnsDatum = SysCacheGetAttr(CONSTROID, heapTuple,
|
||||
Anum_pg_constraint_conkey, &isNull);
|
||||
referencedColumnsDatum = SysCacheGetAttr(CONSTROID, heapTuple,
|
||||
Anum_pg_constraint_confkey, &isNull);
|
||||
|
||||
deconstruct_array(DatumGetArrayTypeP(referencingColumnsDatum), INT2OID, 2, true,
|
||||
's', &referencingColumnArray, NULL, &referencingColumnCount);
|
||||
deconstruct_array(DatumGetArrayTypeP(referencedColumnsDatum), INT2OID, 2, true,
|
||||
's', &referencedColumnArray, NULL, &referencedColumnCount);
|
||||
|
||||
Assert(referencingColumnCount == referencedColumnCount);
|
||||
|
||||
for (attrIdx = 0; attrIdx < referencingColumnCount; ++attrIdx)
|
||||
{
|
||||
AttrNumber referencingAttrNo = DatumGetInt16(referencingColumnArray[attrIdx]);
|
||||
AttrNumber referencedAttrNo = DatumGetInt16(referencedColumnArray[attrIdx]);
|
||||
|
||||
if (distributionColumn->varattno == referencingAttrNo &&
|
||||
referencedTablePartitionColumn->varattno == referencedAttrNo)
|
||||
{
|
||||
foreignConstraintOnPartitionColumn = true;
|
||||
}
|
||||
}
|
||||
|
||||
if (!foreignConstraintOnPartitionColumn)
|
||||
{
|
||||
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||
errmsg("cannot create foreign key constraint"),
|
||||
errdetail("Partition column must exist both "
|
||||
"referencing and referenced side of the "
|
||||
"foreign constraint statement and it must "
|
||||
"be in the same ordinal in both sides.")));
|
||||
}
|
||||
|
||||
/*
|
||||
* We do not allow to create foreign constraints if shard replication factor is
|
||||
* greater than 1. Because in our current design, multiple replicas may cause
|
||||
* locking problems and inconsistent shard contents.
|
||||
*/
|
||||
if (ShardReplicationFactor > 1 || (referencedTableId != relation->rd_id &&
|
||||
!SingleReplicatedTable(referencedTableId)))
|
||||
{
|
||||
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||
errmsg("cannot create foreign key constraint"),
|
||||
errdetail("Citus Community Edition currently supports "
|
||||
"foreign key constraints only for "
|
||||
"\"citus.shard_replication_factor = 1\"."),
|
||||
errhint("Please change \"citus.shard_replication_factor to "
|
||||
"1\". To learn more about using foreign keys with "
|
||||
"other replication factors, please contact us at "
|
||||
"https://citusdata.com/about/contact_us.")));
|
||||
}
|
||||
|
||||
heapTuple = systable_getnext(scanDescriptor);
|
||||
}
|
||||
|
||||
/* clean up scan and close system catalog */
|
||||
systable_endscan(scanDescriptor);
|
||||
heap_close(pgConstraint, AccessShareLock);
|
||||
}
|
||||
|
||||
|
||||
static void
|
||||
ErrorIfUnsupportedAlterAddConstraintStmt(AlterTableStmt *alterTableStatement)
|
||||
{
|
||||
Oid relationId = InvalidOid;
|
||||
bool isDistributedRelation = false;
|
||||
LOCKMODE lockmode = 0;
|
||||
|
||||
Relation relation = NULL;
|
||||
char distributionMethod;
|
||||
Var *distributionColumn = NULL;
|
||||
uint32 colocationId = 0;
|
||||
|
||||
/* continue if it is not distributed table */
|
||||
lockmode = AlterTableGetLockLevel(alterTableStatement->cmds);
|
||||
relationId = AlterTableLookupRelation(alterTableStatement, lockmode);
|
||||
|
||||
isDistributedRelation = IsDistributedTable(relationId);
|
||||
if (!isDistributedRelation)
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
distributionMethod = PartitionMethod(relationId);
|
||||
distributionColumn = PartitionKey(relationId);
|
||||
colocationId = TableColocationId(relationId);
|
||||
|
||||
relation = relation_open(relationId, ExclusiveLock);
|
||||
ErrorIfNotSupportedConstraint(relation, distributionMethod,
|
||||
distributionColumn, colocationId);
|
||||
relation_close(relation, NoLock);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* ErrorIfUnsupportedSeqStmt errors out if the provided create sequence
|
||||
* statement specifies a distributed table in its OWNED BY clause.
|
||||
|
|
|
@ -1,16 +0,0 @@
|
|||
/*
|
||||
* create_distributed_table.h
|
||||
*
|
||||
* Created on: Mar 17, 2017
|
||||
* Author: velioglub
|
||||
*/
|
||||
|
||||
#ifndef SRC_INCLUDE_DISTRIBUTED_CREATE_DISTRIBUTED_TABLE_H_
|
||||
#define SRC_INCLUDE_DISTRIBUTED_CREATE_DISTRIBUTED_TABLE_H_
|
||||
|
||||
extern void ErrorIfNotSupportedConstraint(Relation relation,
|
||||
char distributionMethod,
|
||||
Var *distributionColumn,
|
||||
uint32 colocationId);
|
||||
|
||||
#endif /* SRC_INCLUDE_DISTRIBUTED_CREATE_DISTRIBUTED_TABLE_H_ */
|
|
@ -25,6 +25,11 @@ extern void multi_ProcessUtility(Node *parsetree, const char *queryString,
|
|||
ProcessUtilityContext context, ParamListInfo params,
|
||||
DestReceiver *dest, char *completionTag);
|
||||
extern void ReplicateGrantStmt(Node *parsetree);
|
||||
|
||||
extern void ErrorIfNotSupportedConstraint(Relation relation, char distributionMethod,
|
||||
Var *distributionColumn, uint32 colocationId);
|
||||
extern void ErrorIfNotSupportedForeignConstraint(Relation relation,
|
||||
char distributionMethod,
|
||||
Var *distributionColumn,
|
||||
uint32 colocationId);
|
||||
|
||||
#endif /* MULTI_UTILITY_H */
|
||||
|
|
Loading…
Reference in New Issue