Foreign Constraint Support for create_distributed_table and shard move

With this change, we now push down foreign key constraints created during CREATE TABLE
statements. We also start to send foreign constraints during shard move along with
other DDL statements
pull/888/head
Burak Yucesoy 2016-10-19 14:12:46 +03:00 committed by Burak Yücesoy
parent 84babaa58e
commit 5a03acf2bf
22 changed files with 1174 additions and 160 deletions

View File

@ -8,7 +8,7 @@ EXTENSION = citus
EXTVERSIONS = 5.0 5.0-1 5.0-2 \ EXTVERSIONS = 5.0 5.0-1 5.0-2 \
5.1-1 5.1-2 5.1-3 5.1-4 5.1-5 5.1-6 5.1-7 5.1-8 \ 5.1-1 5.1-2 5.1-3 5.1-4 5.1-5 5.1-6 5.1-7 5.1-8 \
5.2-1 5.2-2 5.2-3 5.2-4 \ 5.2-1 5.2-2 5.2-3 5.2-4 \
6.0-1 6.0-2 6.0-3 6.0-4 6.0-5 6.0-6 6.0-7 6.0-8 6.0-9 6.0-10 6.0-11 6.0-12 6.0-1 6.0-2 6.0-3 6.0-4 6.0-5 6.0-6 6.0-7 6.0-8 6.0-9 6.0-10 6.0-11 6.0-12 6.0-13
# All citus--*.sql files in the source directory # All citus--*.sql files in the source directory
DATA = $(patsubst $(citus_abs_srcdir)/%.sql,%.sql,$(wildcard $(citus_abs_srcdir)/$(EXTENSION)--*--*.sql)) DATA = $(patsubst $(citus_abs_srcdir)/%.sql,%.sql,$(wildcard $(citus_abs_srcdir)/$(EXTENSION)--*--*.sql))
@ -82,6 +82,8 @@ $(EXTENSION)--6.0-11.sql: $(EXTENSION)--6.0-10.sql $(EXTENSION)--6.0-10--6.0-11.
cat $^ > $@ cat $^ > $@
$(EXTENSION)--6.0-12.sql: $(EXTENSION)--6.0-11.sql $(EXTENSION)--6.0-11--6.0-12.sql $(EXTENSION)--6.0-12.sql: $(EXTENSION)--6.0-11.sql $(EXTENSION)--6.0-11--6.0-12.sql
cat $^ > $@ cat $^ > $@
$(EXTENSION)--6.0-13.sql: $(EXTENSION)--6.0-12.sql $(EXTENSION)--6.0-12--6.0-13.sql
cat $^ > $@
NO_PGXS = 1 NO_PGXS = 1

View File

@ -0,0 +1,16 @@
/* citus--6.0-12--6.0-13.sql */
CREATE FUNCTION pg_catalog.worker_apply_inter_shard_ddl_command(referencing_shard bigint,
referencing_schema_name text,
referenced_shard bigint,
referenced_schema_name text,
command text)
RETURNS void
LANGUAGE C STRICT
AS 'MODULE_PATHNAME', $$worker_apply_inter_shard_ddl_command$$;
COMMENT ON FUNCTION pg_catalog.worker_apply_inter_shard_ddl_command(referencing_shard bigint,
referencing_schema_name text,
referenced_shard bigint,
referenced_schema_name text,
command text)
IS 'executes inter shard ddl command';

View File

@ -1,6 +1,6 @@
# Citus extension # Citus extension
comment = 'Citus distributed database' comment = 'Citus distributed database'
default_version = '6.0-12' default_version = '6.0-13'
module_pathname = '$libdir/citus' module_pathname = '$libdir/citus'
relocatable = false relocatable = false
schema = pg_catalog schema = pg_catalog

View File

@ -22,10 +22,15 @@
#include "catalog/index.h" #include "catalog/index.h"
#include "catalog/indexing.h" #include "catalog/indexing.h"
#include "catalog/pg_am.h" #include "catalog/pg_am.h"
#include "catalog/pg_constraint.h"
#if (PG_VERSION_NUM >= 90600)
#include "catalog/pg_constraint_fn.h"
#endif
#include "catalog/pg_enum.h" #include "catalog/pg_enum.h"
#include "catalog/pg_extension.h" #include "catalog/pg_extension.h"
#include "catalog/pg_opclass.h" #include "catalog/pg_opclass.h"
#include "catalog/pg_trigger.h" #include "catalog/pg_trigger.h"
#include "catalog/pg_type.h"
#include "commands/defrem.h" #include "commands/defrem.h"
#include "commands/extension.h" #include "commands/extension.h"
#include "commands/sequence.h" #include "commands/sequence.h"
@ -64,7 +69,11 @@ static Oid SupportFunctionForColumn(Var *partitionColumn, Oid accessMethodId,
int16 supportFunctionNumber); int16 supportFunctionNumber);
static bool LocalTableEmpty(Oid tableId); static bool LocalTableEmpty(Oid tableId);
static void ErrorIfNotSupportedConstraint(Relation relation, char distributionMethod, static void ErrorIfNotSupportedConstraint(Relation relation, char distributionMethod,
Var *distributionColumn); Var *distributionColumn, uint32 colocationId);
static void ErrorIfNotSupportedForeignConstraint(Relation relation,
char distributionMethod,
Var *distributionColumn,
uint32 colocationId);
static void InsertIntoPgDistPartition(Oid relationId, char distributionMethod, static void InsertIntoPgDistPartition(Oid relationId, char distributionMethod,
Var *distributionColumn, uint32 colocationId); Var *distributionColumn, uint32 colocationId);
static void CreateTruncateTrigger(Oid relationId); static void CreateTruncateTrigger(Oid relationId);
@ -270,7 +279,8 @@ ConvertToDistributedTable(Oid relationId, char *distributionColumnName,
} }
} }
ErrorIfNotSupportedConstraint(relation, distributionMethod, distributionColumn); ErrorIfNotSupportedConstraint(relation, distributionMethod, distributionColumn,
colocationId);
InsertIntoPgDistPartition(relationId, distributionMethod, distributionColumn, InsertIntoPgDistPartition(relationId, distributionMethod, distributionColumn,
colocationId); colocationId);
@ -305,7 +315,7 @@ ConvertToDistributedTable(Oid relationId, char *distributionColumnName,
*/ */
static void static void
ErrorIfNotSupportedConstraint(Relation relation, char distributionMethod, ErrorIfNotSupportedConstraint(Relation relation, char distributionMethod,
Var *distributionColumn) Var *distributionColumn, uint32 colocationId)
{ {
char *relationName = RelationGetRelationName(relation); char *relationName = RelationGetRelationName(relation);
List *indexOidList = RelationGetIndexList(relation); List *indexOidList = RelationGetIndexList(relation);
@ -387,6 +397,186 @@ ErrorIfNotSupportedConstraint(Relation relation, char distributionMethod,
index_close(indexDesc, NoLock); index_close(indexDesc, NoLock);
} }
/* we also perform check for foreign constraints */
ErrorIfNotSupportedForeignConstraint(relation, distributionMethod, distributionColumn,
colocationId);
}
/*
* ErrorIfNotSupportedForeignConstraint runs checks related to foreign constraints and
* errors out if it is not possible to create one of the foreign constraint in distributed
* environment.
*
* To support foreign constraints, we require that;
* - Referencing and referenced tables are hash distributed.
* - Referencing and referenced tables are co-located.
* - Foreign constraint is defined over distribution column.
* - ON DELETE/UPDATE SET NULL, ON DELETE/UPDATE SET DEFAULT and ON UPDATE CASCADE options
* are not used.
* - Replication factors of referencing and referenced table are 1.
*/
static void
ErrorIfNotSupportedForeignConstraint(Relation relation, char distributionMethod,
Var *distributionColumn, uint32 colocationId)
{
Relation pgConstraint = NULL;
SysScanDesc scanDescriptor = NULL;
ScanKeyData scanKey[1];
int scanKeyCount = 1;
HeapTuple heapTuple = NULL;
Oid referencedTableId = InvalidOid;
uint32 referencedTableColocationId = INVALID_COLOCATION_ID;
Var *referencedTablePartitionColumn = NULL;
Datum referencingColumnsDatum;
Datum *referencingColumnArray;
int referencingColumnCount = 0;
Datum referencedColumnsDatum;
Datum *referencedColumnArray;
int referencedColumnCount = 0;
bool isNull = false;
int attrIdx = 0;
bool foreignConstraintOnPartitionColumn = false;
pgConstraint = heap_open(ConstraintRelationId, AccessShareLock);
ScanKeyInit(&scanKey[0], Anum_pg_constraint_conrelid, BTEqualStrategyNumber, F_OIDEQ,
relation->rd_id);
scanDescriptor = systable_beginscan(pgConstraint, ConstraintRelidIndexId, true, NULL,
scanKeyCount, scanKey);
heapTuple = systable_getnext(scanDescriptor);
while (HeapTupleIsValid(heapTuple))
{
Form_pg_constraint constraintForm = (Form_pg_constraint) GETSTRUCT(heapTuple);
if (constraintForm->contype != CONSTRAINT_FOREIGN)
{
heapTuple = systable_getnext(scanDescriptor);
continue;
}
/*
* ON DELETE SET NULL and ON DELETE SET DEFAULT is not supported. Because we do
* not want to set partition column to NULL or default value.
*/
if (constraintForm->confdeltype == FKCONSTR_ACTION_SETNULL ||
constraintForm->confdeltype == FKCONSTR_ACTION_SETDEFAULT)
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot create foreign key constraint"),
errdetail("SET NULL or SET DEFAULT is not supported"
" in ON DELETE operation.")));
}
/*
* ON UPDATE SET NULL, ON UPDATE SET DEFAULT and UPDATE CASCADE is not supported.
* Because we do not want to set partition column to NULL or default value. Also
* cascading update operation would require re-partitioning. Updating partition
* column value is not allowed anyway even outside of foreign key concept.
*/
if (constraintForm->confupdtype == FKCONSTR_ACTION_SETNULL ||
constraintForm->confupdtype == FKCONSTR_ACTION_SETDEFAULT ||
constraintForm->confupdtype == FKCONSTR_ACTION_CASCADE)
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot create foreign key constraint"),
errdetail("SET NULL, SET DEFAULT or CASCADE is not"
" supported in ON UPDATE operation.")));
}
/* to enforce foreign constraints, tables must be co-located */
referencedTableId = constraintForm->confrelid;
/* check that the relation is not already distributed */
if (!IsDistributedTable(referencedTableId))
{
ereport(ERROR, (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
errmsg("cannot create foreign key constraint"),
errdetail("Referenced table must be a distributed table.")));
}
referencedTableColocationId = TableColocationId(referencedTableId);
if (relation->rd_id != referencedTableId &&
(colocationId == INVALID_COLOCATION_ID ||
colocationId != referencedTableColocationId))
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot create foreign key constraint"),
errdetail("Foreign key constraint can only be created"
" on co-located tables.")));
}
/*
* Partition column must exist in both referencing and referenced side of the
* foreign key constraint. They also must be in same ordinal.
*/
referencedTablePartitionColumn = PartitionKey(referencedTableId);
/*
* Column attributes are not available in Form_pg_constraint, therefore we need
* to find them in the system catalog. After finding them, we iterate over column
* attributes together because partition column must be at the same place in both
* referencing and referenced side of the foreign key constraint
*/
referencingColumnsDatum = SysCacheGetAttr(CONSTROID, heapTuple,
Anum_pg_constraint_conkey, &isNull);
referencedColumnsDatum = SysCacheGetAttr(CONSTROID, heapTuple,
Anum_pg_constraint_confkey, &isNull);
deconstruct_array(DatumGetArrayTypeP(referencingColumnsDatum), INT2OID, 2, true,
's', &referencingColumnArray, NULL, &referencingColumnCount);
deconstruct_array(DatumGetArrayTypeP(referencedColumnsDatum), INT2OID, 2, true,
's', &referencedColumnArray, NULL, &referencedColumnCount);
Assert(referencingColumnCount == referencedColumnCount);
for (attrIdx = 0; attrIdx < referencingColumnCount; ++attrIdx)
{
AttrNumber referencingAttrNo = DatumGetInt16(referencingColumnArray[attrIdx]);
AttrNumber referencedAttrNo = DatumGetInt16(referencedColumnArray[attrIdx]);
if (distributionColumn->varattno == referencingAttrNo &&
referencedTablePartitionColumn->varattno == referencedAttrNo)
{
foreignConstraintOnPartitionColumn = true;
}
}
if (!foreignConstraintOnPartitionColumn)
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot create foreign key constraint"),
errdetail("Partition column must exist both "
"referencing and referenced side of the "
"foreign constraint statement and it must "
"be in the same ordinal in both sides.")));
}
/*
* We do not allow to create foreign constraints if shard replication factor is
* greater than 1. Because in our current design, multiple replicas may cause
* locking problems and inconsistent shard contents.
*/
if (ShardReplicationFactor > 1 || !SingleReplicatedTable(referencedTableId))
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot create foreign key constraint"),
errdetail("Citus cannot create foreign key constrains"
" if replication factor is greater than 1. "
"Contact Citus Data for alternative "
"deployment options.")));
}
heapTuple = systable_getnext(scanDescriptor);
}
/* clean up scan and close system catalog */
systable_endscan(scanDescriptor);
heap_close(pgConstraint, AccessShareLock);
} }

View File

@ -34,6 +34,7 @@
#include "distributed/pg_dist_partition.h" #include "distributed/pg_dist_partition.h"
#include "distributed/pg_dist_shard.h" #include "distributed/pg_dist_shard.h"
#include "distributed/resource_lock.h" #include "distributed/resource_lock.h"
#include "distributed/shardinterval_utils.h"
#include "distributed/worker_manager.h" #include "distributed/worker_manager.h"
#include "lib/stringinfo.h" #include "lib/stringinfo.h"
#include "nodes/pg_list.h" #include "nodes/pg_list.h"
@ -229,6 +230,7 @@ CreateColocatedShards(Oid targetRelationId, Oid sourceRelationId)
List *existingShardList = NIL; List *existingShardList = NIL;
List *sourceShardIntervalList = NIL; List *sourceShardIntervalList = NIL;
List *targetTableDDLEvents = NIL; List *targetTableDDLEvents = NIL;
List *targetTableForeignConstraintCommands = NIL;
ListCell *sourceShardCell = NULL; ListCell *sourceShardCell = NULL;
/* make sure that tables are hash partitioned */ /* make sure that tables are hash partitioned */
@ -265,6 +267,8 @@ CreateColocatedShards(Oid targetRelationId, Oid sourceRelationId)
targetTableRelationOwner = TableOwner(targetRelationId); targetTableRelationOwner = TableOwner(targetRelationId);
targetTableDDLEvents = GetTableDDLEvents(targetRelationId); targetTableDDLEvents = GetTableDDLEvents(targetRelationId);
targetTableForeignConstraintCommands = GetTableForeignConstraintCommands(
targetRelationId);
targetShardStorageType = ShardStorageType(targetRelationId); targetShardStorageType = ShardStorageType(targetRelationId);
foreach(sourceShardCell, sourceShardIntervalList) foreach(sourceShardCell, sourceShardIntervalList)
@ -273,6 +277,7 @@ CreateColocatedShards(Oid targetRelationId, Oid sourceRelationId)
uint64 sourceShardId = sourceShardInterval->shardId; uint64 sourceShardId = sourceShardInterval->shardId;
uint64 newShardId = GetNextShardId(); uint64 newShardId = GetNextShardId();
ListCell *sourceShardPlacementCell = NULL; ListCell *sourceShardPlacementCell = NULL;
int sourceShardIndex = FindShardIntervalIndex(sourceShardInterval);
int32 shardMinValue = DatumGetInt32(sourceShardInterval->minValue); int32 shardMinValue = DatumGetInt32(sourceShardInterval->minValue);
int32 shardMaxValue = DatumGetInt32(sourceShardInterval->maxValue); int32 shardMaxValue = DatumGetInt32(sourceShardInterval->maxValue);
@ -288,9 +293,10 @@ CreateColocatedShards(Oid targetRelationId, Oid sourceRelationId)
int32 sourceNodePort = sourcePlacement->nodePort; int32 sourceNodePort = sourcePlacement->nodePort;
bool created = WorkerCreateShard(targetRelationId, sourceNodeName, bool created = WorkerCreateShard(targetRelationId, sourceNodeName,
sourceNodePort, newShardId, sourceNodePort, sourceShardIndex, newShardId,
targetTableRelationOwner, targetTableRelationOwner,
targetTableDDLEvents); targetTableDDLEvents,
targetTableForeignConstraintCommands);
if (created) if (created)
{ {
const RelayFileState shardState = FILE_FINALIZED; const RelayFileState shardState = FILE_FINALIZED;

View File

@ -31,6 +31,10 @@
#include "catalog/indexing.h" #include "catalog/indexing.h"
#include "catalog/namespace.h" #include "catalog/namespace.h"
#include "catalog/pg_class.h" #include "catalog/pg_class.h"
#include "catalog/pg_constraint.h"
#if (PG_VERSION_NUM >= 90600)
#include "catalog/pg_constraint_fn.h"
#endif
#include "catalog/pg_index.h" #include "catalog/pg_index.h"
#include "catalog/pg_type.h" #include "catalog/pg_type.h"
#include "commands/sequence.h" #include "commands/sequence.h"
@ -579,7 +583,7 @@ ResolveRelationId(text *relationName)
* GetTableDDLEvents takes in a relationId, and returns the list of DDL commands * GetTableDDLEvents takes in a relationId, and returns the list of DDL commands
* needed to reconstruct the relation. These DDL commands are all palloced; and * needed to reconstruct the relation. These DDL commands are all palloced; and
* include the table's schema definition, optional column storage and statistics * include the table's schema definition, optional column storage and statistics
* definitions, and index and constraint defitions. * definitions, and index and constraint definitions.
*/ */
List * List *
GetTableDDLEvents(Oid relationId) GetTableDDLEvents(Oid relationId)
@ -730,6 +734,67 @@ GetTableDDLEvents(Oid relationId)
} }
/*
* GetTableForeignConstraints takes in a relationId, and returns the list of foreign
* constraint commands needed to reconstruct foreign constraints of that table.
*/
List *
GetTableForeignConstraintCommands(Oid relationId)
{
List *tableForeignConstraints = NIL;
Relation pgConstraint = NULL;
SysScanDesc scanDescriptor = NULL;
ScanKeyData scanKey[1];
int scanKeyCount = 1;
HeapTuple heapTuple = NULL;
/*
* Set search_path to NIL so that all objects outside of pg_catalog will be
* schema-prefixed. pg_catalog will be added automatically when we call
* PushOverrideSearchPath(), since we set addCatalog to true;
*/
OverrideSearchPath *overridePath = GetOverrideSearchPath(CurrentMemoryContext);
overridePath->schemas = NIL;
overridePath->addCatalog = true;
PushOverrideSearchPath(overridePath);
/* open system catalog and scan all constraints that belong to this table */
pgConstraint = heap_open(ConstraintRelationId, AccessShareLock);
ScanKeyInit(&scanKey[0], Anum_pg_constraint_conrelid, BTEqualStrategyNumber, F_OIDEQ,
relationId);
scanDescriptor = systable_beginscan(pgConstraint, ConstraintRelidIndexId, true, NULL,
scanKeyCount, scanKey);
heapTuple = systable_getnext(scanDescriptor);
while (HeapTupleIsValid(heapTuple))
{
Form_pg_constraint constraintForm = (Form_pg_constraint) GETSTRUCT(heapTuple);
if (constraintForm->contype == CONSTRAINT_FOREIGN)
{
Oid constraintId = get_relation_constraint_oid(relationId,
constraintForm->conname.data,
true);
char *statementDef = pg_get_constraintdef_command(constraintId);
tableForeignConstraints = lappend(tableForeignConstraints, statementDef);
}
heapTuple = systable_getnext(scanDescriptor);
}
/* clean up scan and close system catalog */
systable_endscan(scanDescriptor);
heap_close(pgConstraint, AccessShareLock);
/* revert back to original search_path */
PopOverrideSearchPath();
return tableForeignConstraints;
}
/* /*
* ShardStorageType returns the shard storage type according to relation type. * ShardStorageType returns the shard storage type according to relation type.
*/ */

View File

@ -52,6 +52,7 @@ static void CopyShardPlacement(int64 shardId, char *sourceNodeName, int32 source
bool doRepair); bool doRepair);
static List * CopyShardCommandList(ShardInterval *shardInterval, char *sourceNodeName, static List * CopyShardCommandList(ShardInterval *shardInterval, char *sourceNodeName,
int32 sourceNodePort); int32 sourceNodePort);
static List * CopyShardForeignConstraintCommandList(ShardInterval *shardInterval);
static char * ConstructQualifiedShardName(ShardInterval *shardInterval); static char * ConstructQualifiedShardName(ShardInterval *shardInterval);
static List * RecreateTableDDLCommandList(Oid relationId); static List * RecreateTableDDLCommandList(Oid relationId);
@ -280,24 +281,29 @@ CopyShardPlacement(int64 shardId, char *sourceNodeName, int32 sourceNodePort,
ShardInterval *shardInterval = LoadShardInterval(shardId); ShardInterval *shardInterval = LoadShardInterval(shardId);
List *colocatedShardList = ColocatedShardIntervalList(shardInterval); List *colocatedShardList = ColocatedShardIntervalList(shardInterval);
ListCell *colocatedShardCell = NULL; ListCell *colocatedShardCell = NULL;
List *ddlCommandList = NIL;
char *nodeUser = CitusExtensionOwnerName();
foreach(colocatedShardCell, colocatedShardList) foreach(colocatedShardCell, colocatedShardList)
{ {
ShardInterval *colocatedShard = (ShardInterval *) lfirst(colocatedShardCell); ShardInterval *colocatedShard = (ShardInterval *) lfirst(colocatedShardCell);
List *colocatedShardDdlList = NIL; List *ddlCommandList = CopyShardCommandList(colocatedShard, sourceNodeName,
sourceNodePort);
char *tableOwner = TableOwner(colocatedShard->relationId);
colocatedShardDdlList = CopyShardCommandList(colocatedShard, sourceNodeName, SendCommandListToWorkerInSingleTransaction(targetNodeName, targetNodePort,
sourceNodePort); tableOwner, ddlCommandList);
ddlCommandList = list_concat(ddlCommandList, colocatedShardDdlList);
} }
HOLD_INTERRUPTS(); foreach(colocatedShardCell, colocatedShardList)
{
ShardInterval *colocatedShard = (ShardInterval *) lfirst(colocatedShardCell);
List *foreignConstraintCommandList = CopyShardForeignConstraintCommandList(
colocatedShard);
char *tableOwner = TableOwner(colocatedShard->relationId);
SendCommandListToWorkerInSingleTransaction(targetNodeName, targetNodePort, nodeUser, SendCommandListToWorkerInSingleTransaction(targetNodeName, targetNodePort,
ddlCommandList); tableOwner,
foreignConstraintCommandList);
}
foreach(colocatedShardCell, colocatedShardList) foreach(colocatedShardCell, colocatedShardList)
{ {
@ -328,8 +334,6 @@ CopyShardPlacement(int64 shardId, char *sourceNodeName, int32 sourceNodePort,
targetNodePort); targetNodePort);
} }
} }
RESUME_INTERRUPTS();
} }
@ -376,6 +380,70 @@ CopyShardCommandList(ShardInterval *shardInterval,
} }
/*
* CopyShardForeignConstraintCommandList generates command list to create foreign
* constraints existing in source shard after copying it to the other node.
*/
static List *
CopyShardForeignConstraintCommandList(ShardInterval *shardInterval)
{
List *copyShardForeignConstraintCommandList = NIL;
Oid schemaId = get_rel_namespace(shardInterval->relationId);
char *schemaName = get_namespace_name(schemaId);
char *escapedSchemaName = quote_literal_cstr(schemaName);
int shardIndex = 0;
List *commandList = GetTableForeignConstraintCommands(shardInterval->relationId);
ListCell *commandCell = NULL;
/* we will only use shardIndex if there is a foreign constraint */
if (commandList != NIL)
{
shardIndex = FindShardIntervalIndex(shardInterval);
}
foreach(commandCell, commandList)
{
char *command = (char *) lfirst(commandCell);
char *escapedCommand = quote_literal_cstr(command);
Oid referencedRelationId = InvalidOid;
Oid referencedSchemaId = InvalidOid;
char *referencedSchemaName = NULL;
char *escapedReferencedSchemaName = NULL;
uint64 referencedShardId = INVALID_SHARD_ID;
StringInfo applyForeignConstraintCommand = makeStringInfo();
/* we need to parse the foreign constraint command to get referencing table id */
referencedRelationId = ForeignConstraintGetReferencedTableId(command);
if (referencedRelationId == InvalidOid)
{
ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("cannot create foreign key constraint"),
errdetail("Referenced relation cannot be found.")));
}
referencedSchemaId = get_rel_namespace(referencedRelationId);
referencedSchemaName = get_namespace_name(referencedSchemaId);
escapedReferencedSchemaName = quote_literal_cstr(referencedSchemaName);
referencedShardId = ColocatedShardIdInRelation(referencedRelationId, shardIndex);
appendStringInfo(applyForeignConstraintCommand,
WORKER_APPLY_INTER_SHARD_DDL_COMMAND, shardInterval->shardId,
escapedSchemaName, referencedShardId,
escapedReferencedSchemaName, escapedCommand);
copyShardForeignConstraintCommandList = lappend(
copyShardForeignConstraintCommandList,
applyForeignConstraintCommand->data);
}
return copyShardForeignConstraintCommandList;
}
/* /*
* ConstuctQualifiedShardName creates the fully qualified name string of the * ConstuctQualifiedShardName creates the fully qualified name string of the
* given shard in <schema>.<table_name>_<shard_id> format. * given shard in <schema>.<table_name>_<shard_id> format.

View File

@ -20,7 +20,10 @@
#include "access/htup_details.h" #include "access/htup_details.h"
#include "access/xact.h" #include "access/xact.h"
#include "commands/tablecmds.h"
#include "catalog/indexing.h" #include "catalog/indexing.h"
#include "catalog/namespace.h"
#include "distributed/colocation_utils.h"
#include "distributed/multi_client_executor.h" #include "distributed/multi_client_executor.h"
#include "distributed/master_metadata_utility.h" #include "distributed/master_metadata_utility.h"
#include "distributed/master_protocol.h" #include "distributed/master_protocol.h"
@ -384,9 +387,26 @@ CreateShardPlacements(Oid relationId, int64 shardId, List *ddlEventList,
WorkerNode *workerNode = (WorkerNode *) list_nth(workerNodeList, workerNodeIndex); WorkerNode *workerNode = (WorkerNode *) list_nth(workerNodeList, workerNodeIndex);
char *nodeName = workerNode->workerName; char *nodeName = workerNode->workerName;
uint32 nodePort = workerNode->workerPort; uint32 nodePort = workerNode->workerPort;
List *foreignConstraintCommandList = GetTableForeignConstraintCommands(
relationId);
int shardIndex = -1; /* not used in this code path */
bool created = false;
bool created = WorkerCreateShard(relationId, nodeName, nodePort, shardId, /*
newPlacementOwner, ddlEventList); * In this code path, we create not co-located tables. Therefore we should error
* out if there is a foreign key constraint specified.
*/
if (foreignConstraintCommandList != NIL)
{
ereport(ERROR, (errmsg("could only create distributed table"),
errdetail("Table contains foreign key constraints. Foreign "
"key constraints only supported with co-located "
"tables")));
}
created = WorkerCreateShard(relationId, nodeName, nodePort, shardIndex,
shardId, newPlacementOwner, ddlEventList,
foreignConstraintCommandList);
if (created) if (created)
{ {
const RelayFileState shardState = FILE_FINALIZED; const RelayFileState shardState = FILE_FINALIZED;
@ -424,12 +444,15 @@ CreateShardPlacements(Oid relationId, int64 shardId, List *ddlEventList,
*/ */
bool bool
WorkerCreateShard(Oid relationId, char *nodeName, uint32 nodePort, WorkerCreateShard(Oid relationId, char *nodeName, uint32 nodePort,
uint64 shardId, char *newShardOwner, List *ddlCommandList) int shardIndex, uint64 shardId, char *newShardOwner,
List *ddlCommandList, List *foreignConstraintCommandList)
{ {
Oid schemaId = get_rel_namespace(relationId); Oid schemaId = get_rel_namespace(relationId);
char *schemaName = get_namespace_name(schemaId); char *schemaName = get_namespace_name(schemaId);
char *escapedSchemaName = quote_literal_cstr(schemaName);
bool shardCreated = true; bool shardCreated = true;
ListCell *ddlCommandCell = NULL; ListCell *ddlCommandCell = NULL;
ListCell *foreignConstraintCommandCell = NULL;
foreach(ddlCommandCell, ddlCommandList) foreach(ddlCommandCell, ddlCommandList)
{ {
@ -440,8 +463,6 @@ WorkerCreateShard(Oid relationId, char *nodeName, uint32 nodePort,
if (strcmp(schemaName, "public") != 0) if (strcmp(schemaName, "public") != 0)
{ {
char *escapedSchemaName = quote_literal_cstr(schemaName);
appendStringInfo(applyDDLCommand, WORKER_APPLY_SHARD_DDL_COMMAND, shardId, appendStringInfo(applyDDLCommand, WORKER_APPLY_SHARD_DDL_COMMAND, shardId,
escapedSchemaName, escapedDDLCommand); escapedSchemaName, escapedDDLCommand);
} }
@ -461,6 +482,47 @@ WorkerCreateShard(Oid relationId, char *nodeName, uint32 nodePort,
} }
} }
foreach(foreignConstraintCommandCell, foreignConstraintCommandList)
{
char *command = (char *) lfirst(foreignConstraintCommandCell);
char *escapedCommand = quote_literal_cstr(command);
Oid referencedRelationId = InvalidOid;
Oid referencedSchemaId = InvalidOid;
char *referencedSchemaName = NULL;
char *escapedReferencedSchemaName = NULL;
uint64 referencedShardId = INVALID_SHARD_ID;
List *queryResultList = NIL;
StringInfo applyForeignConstraintCommand = makeStringInfo();
/* we need to parse the foreign constraint command to get referencing table id */
referencedRelationId = ForeignConstraintGetReferencedTableId(command);
if (referencedRelationId == InvalidOid)
{
ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("cannot create foreign key constraint"),
errdetail("Referenced relation cannot be found.")));
}
referencedSchemaId = get_rel_namespace(referencedRelationId);
referencedSchemaName = get_namespace_name(referencedSchemaId);
escapedReferencedSchemaName = quote_literal_cstr(referencedSchemaName);
referencedShardId = ColocatedShardIdInRelation(referencedRelationId, shardIndex);
appendStringInfo(applyForeignConstraintCommand,
WORKER_APPLY_INTER_SHARD_DDL_COMMAND, shardId, escapedSchemaName,
referencedShardId, escapedReferencedSchemaName, escapedCommand);
queryResultList = ExecuteRemoteQuery(nodeName, nodePort, newShardOwner,
applyForeignConstraintCommand);
if (queryResultList == NIL)
{
shardCreated = false;
break;
}
}
return shardCreated; return shardCreated;
} }
@ -682,3 +744,31 @@ WorkerShardStats(char *nodeName, uint32 nodePort, Oid relationId, char *shardNam
return true; return true;
} }
/*
* ForeignConstraintGetReferencedTableId parses given foreign constraint query and
* extracts referenced table id from it.
*/
Oid
ForeignConstraintGetReferencedTableId(char *queryString)
{
Node *queryNode = ParseTreeNode(queryString);
AlterTableStmt *foreignConstraintStmt = (AlterTableStmt *) queryNode;
AlterTableCmd *command = (AlterTableCmd *) linitial(foreignConstraintStmt->cmds);
if (command->subtype == AT_AddConstraint)
{
Constraint *constraint = (Constraint *) command->def;
if (constraint->contype == CONSTR_FOREIGN)
{
RangeVar *referencedTable = constraint->pktable;
LOCKMODE lockmode = AlterTableGetLockLevel(foreignConstraintStmt->cmds);
return RangeVarGetRelid(referencedTable, lockmode,
foreignConstraintStmt->missing_ok);
}
}
return InvalidOid;
}

View File

@ -47,9 +47,6 @@
#include "utils/relcache.h" #include "utils/relcache.h"
/* Local functions forward declarations */ /* Local functions forward declarations */
static bool TypeAddIndexConstraint(const AlterTableCmd *command);
static bool TypeDropIndexConstraint(const AlterTableCmd *command,
const RangeVar *relation, uint64 shardId);
static void AppendShardIdToConstraintName(AlterTableCmd *command, uint64 shardId); static void AppendShardIdToConstraintName(AlterTableCmd *command, uint64 shardId);
static void SetSchemaNameIfNotExist(char **schemaName, char *newSchemaName); static void SetSchemaNameIfNotExist(char **schemaName, char *newSchemaName);
static bool UpdateWholeRowColumnReferencesWalker(Node *node, uint64 *shardId); static bool UpdateWholeRowColumnReferencesWalker(Node *node, uint64 *shardId);
@ -81,15 +78,12 @@ RelayEventExtendNames(Node *parseTree, char *schemaName, uint64 shardId)
{ {
/* /*
* We append shardId to the very end of table and index names to * We append shardId to the very end of table and index names to
* avoid name collisions. We usually do not touch constraint names, * avoid name collisions. We also append shardId to constraint names.
* except for cases where they refer to index names. In such cases,
* we also append to constraint names.
*/ */
AlterTableStmt *alterTableStmt = (AlterTableStmt *) parseTree; AlterTableStmt *alterTableStmt = (AlterTableStmt *) parseTree;
char **relationName = &(alterTableStmt->relation->relname); char **relationName = &(alterTableStmt->relation->relname);
char **relationSchemaName = &(alterTableStmt->relation->schemaname); char **relationSchemaName = &(alterTableStmt->relation->schemaname);
RangeVar *relation = alterTableStmt->relation; /* for constraints */
List *commandList = alterTableStmt->cmds; List *commandList = alterTableStmt->cmds;
ListCell *commandCell = NULL; ListCell *commandCell = NULL;
@ -104,8 +98,8 @@ RelayEventExtendNames(Node *parseTree, char *schemaName, uint64 shardId)
{ {
AlterTableCmd *command = (AlterTableCmd *) lfirst(commandCell); AlterTableCmd *command = (AlterTableCmd *) lfirst(commandCell);
if (TypeAddIndexConstraint(command) || if (command->subtype == AT_AddConstraint ||
TypeDropIndexConstraint(command, relation, shardId)) command->subtype == AT_DropConstraint)
{ {
AppendShardIdToConstraintName(command, shardId); AppendShardIdToConstraintName(command, shardId);
} }
@ -418,111 +412,63 @@ RelayEventExtendNames(Node *parseTree, char *schemaName, uint64 shardId)
/* /*
* TypeAddIndexConstraint checks if the alter table command adds a constraint * RelayEventExtendNamesForInterShardCommands extends relation names in the given parse
* and if that constraint also results in an index creation. * tree for certain utility commands. The function more specifically extends table and
* constraint names in the parse tree by appending the given shardId; thereby
* avoiding name collisions in the database among sharded tables. This function
* has the side effect of extending relation names in the parse tree.
*/ */
static bool void
TypeAddIndexConstraint(const AlterTableCmd *command) RelayEventExtendNamesForInterShardCommands(Node *parseTree, uint64 leftShardId,
char *leftShardSchemaName, uint64 rightShardId,
char *rightShardSchemaName)
{ {
if (command->subtype == AT_AddConstraint) NodeTag nodeType = nodeTag(parseTree);
switch (nodeType)
{ {
if (IsA(command->def, Constraint)) case T_AlterTableStmt:
{ {
Constraint *constraint = (Constraint *) command->def; AlterTableStmt *alterTableStmt = (AlterTableStmt *) parseTree;
if (constraint->contype == CONSTR_PRIMARY || List *commandList = alterTableStmt->cmds;
constraint->contype == CONSTR_UNIQUE || ListCell *commandCell = NULL;
constraint->contype == CONSTR_EXCLUSION)
foreach(commandCell, commandList)
{ {
return true; AlterTableCmd *command = (AlterTableCmd *) lfirst(commandCell);
} if (command->subtype == AT_AddConstraint)
} {
} Constraint *constraint = (Constraint *) command->def;
if (constraint->contype == CONSTR_FOREIGN)
{
char **referencedTableName = &(constraint->pktable->relname);
char **relationSchemaName = &(constraint->pktable->schemaname);
return false; /* prefix with schema name if it is not added already */
} SetSchemaNameIfNotExist(relationSchemaName, rightShardSchemaName);
/*
/* * We will not append shard id to referencing table name or
* TypeDropIndexConstraint checks if the alter table command drops a constraint * constraint name. They will be handled when we drop into
* and if that constraint also results in an index drop. Note that drop * RelayEventExtendNames.
* constraints do not have access to constraint type information; this is in */
* contrast with add constraint commands. This function therefore performs AppendShardIdToName(referencedTableName, rightShardId);
* additional system catalog lookups to determine if the drop constraint is }
* associated with an index. }
*/
static bool
TypeDropIndexConstraint(const AlterTableCmd *command,
const RangeVar *relation, uint64 shardId)
{
Relation pgConstraint = NULL;
SysScanDesc scanDescriptor = NULL;
ScanKeyData scanKey[1];
int scanKeyCount = 1;
HeapTuple heapTuple = NULL;
char *searchedConstraintName = NULL;
bool indexConstraint = false;
Oid relationId = InvalidOid;
bool failOK = true;
if (command->subtype != AT_DropConstraint)
{
return false;
}
/*
* At this stage, our only option is performing a relationId lookup. We
* first find the relationId, and then scan the pg_constraints system
* catalog using this relationId. Finally, we check if the passed in
* constraint is for a primary key, unique, or exclusion index.
*/
relationId = RangeVarGetRelid(relation, NoLock, failOK);
if (!OidIsValid(relationId))
{
/* overlook this error, it should be signaled later in the pipeline */
return false;
}
searchedConstraintName = pstrdup(command->name);
AppendShardIdToName(&searchedConstraintName, shardId);
pgConstraint = heap_open(ConstraintRelationId, AccessShareLock);
ScanKeyInit(&scanKey[0], Anum_pg_constraint_conrelid,
BTEqualStrategyNumber, F_OIDEQ, ObjectIdGetDatum(relationId));
scanDescriptor = systable_beginscan(pgConstraint,
ConstraintRelidIndexId, true, /* indexOK */
NULL, scanKeyCount, scanKey);
heapTuple = systable_getnext(scanDescriptor);
while (HeapTupleIsValid(heapTuple))
{
Form_pg_constraint constraintForm = (Form_pg_constraint) GETSTRUCT(heapTuple);
char *constraintName = NameStr(constraintForm->conname);
if (strncmp(constraintName, searchedConstraintName, NAMEDATALEN) == 0)
{
/* we found the constraint, now check if it is for an index */
if (constraintForm->contype == CONSTRAINT_PRIMARY ||
constraintForm->contype == CONSTRAINT_UNIQUE ||
constraintForm->contype == CONSTRAINT_EXCLUSION)
{
indexConstraint = true;
} }
/* drop into RelayEventExtendNames for non-inter table commands */
RelayEventExtendNames(parseTree, leftShardSchemaName, leftShardId);
break; break;
} }
heapTuple = systable_getnext(scanDescriptor); default:
{
ereport(WARNING, (errmsg("unsafe statement type in name extension"),
errdetail("Statement type: %u", (uint32) nodeType)));
break;
}
} }
systable_endscan(scanDescriptor);
heap_close(pgConstraint, AccessShareLock);
pfree(searchedConstraintName);
return indexConstraint;
} }

View File

@ -166,7 +166,7 @@ SendCommandToWorkersParams(TargetWorkerSet targetWorkerSet, char *command,
/* /*
* SendCommandListInSingleTransaction opens connection to the node with the given * SendCommandListToWorkerInSingleTransaction opens connection to the node with the given
* nodeName and nodePort. Then, the connection starts a transaction on the remote * nodeName and nodePort. Then, the connection starts a transaction on the remote
* node and executes the commands in the transaction. The function raises error if * node and executes the commands in the transaction. The function raises error if
* any of the queries fails. * any of the queries fails.
@ -186,46 +186,59 @@ SendCommandListToWorkerInSingleTransaction(char *nodeName, int32 nodePort, char
nodeName, nodePort, nodeUser))); nodeName, nodePort, nodeUser)));
} }
/* start the transaction on the worker node */ PG_TRY();
queryResult = PQexec(workerConnection, "BEGIN");
if (PQresultStatus(queryResult) != PGRES_COMMAND_OK)
{ {
ReraiseRemoteError(workerConnection, queryResult); /* start the transaction on the worker node */
} queryResult = PQexec(workerConnection, "BEGIN");
if (PQresultStatus(queryResult) != PGRES_COMMAND_OK)
PQclear(queryResult);
/* iterate over the commands and execute them in the same connection */
foreach(commandCell, commandList)
{
char *commandString = lfirst(commandCell);
ExecStatusType resultStatus = PGRES_EMPTY_QUERY;
queryResult = PQexec(workerConnection, commandString);
resultStatus = PQresultStatus(queryResult);
if (!(resultStatus == PGRES_SINGLE_TUPLE || resultStatus == PGRES_TUPLES_OK ||
resultStatus == PGRES_COMMAND_OK))
{ {
ReraiseRemoteError(workerConnection, queryResult); ReraiseRemoteError(workerConnection, queryResult);
} }
PQclear(queryResult); PQclear(queryResult);
}
/* commit the transaction on the worker node */ /* iterate over the commands and execute them in the same connection */
queryResult = PQexec(workerConnection, "COMMIT"); foreach(commandCell, commandList)
if (PQresultStatus(queryResult) != PGRES_COMMAND_OK) {
char *commandString = lfirst(commandCell);
ExecStatusType resultStatus = PGRES_EMPTY_QUERY;
CHECK_FOR_INTERRUPTS();
queryResult = PQexec(workerConnection, commandString);
resultStatus = PQresultStatus(queryResult);
if (!(resultStatus == PGRES_SINGLE_TUPLE || resultStatus == PGRES_TUPLES_OK ||
resultStatus == PGRES_COMMAND_OK))
{
ReraiseRemoteError(workerConnection, queryResult);
}
PQclear(queryResult);
}
/* commit the transaction on the worker node */
queryResult = PQexec(workerConnection, "COMMIT");
if (PQresultStatus(queryResult) != PGRES_COMMAND_OK)
{
ReraiseRemoteError(workerConnection, queryResult);
}
PQclear(queryResult);
/* clear NULL result */
PQgetResult(workerConnection);
/* we no longer need this connection */
PQfinish(workerConnection);
}
PG_CATCH();
{ {
ReraiseRemoteError(workerConnection, queryResult); /* close the connection */
PQfinish(workerConnection);
PG_RE_THROW();
} }
PG_END_TRY();
PQclear(queryResult);
/* clear NULL result */
PQgetResult(workerConnection);
/* we no longer need this connection */
PQfinish(workerConnection);
} }

View File

@ -260,3 +260,16 @@ ColocatedTableId(Oid colocationId)
return colocatedTableId; return colocatedTableId;
} }
/*
* ColocatedShardIdInRelation returns shardId of the shard from given relation, so that
* returned shard is co-located with given shard.
*/
uint64
ColocatedShardIdInRelation(Oid relationId, int shardIndex)
{
DistTableCacheEntry *tableCacheEntry = DistributedTableCacheEntry(relationId);
return tableCacheEntry->sortedShardIntervalArray[shardIndex]->shardId;
}

View File

@ -256,3 +256,30 @@ SearchCachedShardInterval(Datum partitionColumnValue, ShardInterval **shardInter
return NULL; return NULL;
} }
/*
* SingleReplicatedTable checks whether all shards of a distributed table, do not have
* more than one replica. If even one shard has more than one replica, this function
* returns false, otherwise it returns true.
*/
bool
SingleReplicatedTable(Oid relationId)
{
List *shardIntervalList = LoadShardList(relationId);
ListCell *shardIntervalCell = NULL;
foreach(shardIntervalCell, shardIntervalList)
{
uint64 *shardIdPointer = (uint64 *) lfirst(shardIntervalCell);
uint64 shardId = (*shardIdPointer);
List *shardPlacementList = ShardPlacementList(shardId);
if (shardPlacementList->length > 1)
{
return false;
}
}
return true;
}

View File

@ -74,6 +74,7 @@ static bool check_log_statement(List *stmt_list);
PG_FUNCTION_INFO_V1(worker_fetch_partition_file); PG_FUNCTION_INFO_V1(worker_fetch_partition_file);
PG_FUNCTION_INFO_V1(worker_fetch_query_results_file); PG_FUNCTION_INFO_V1(worker_fetch_query_results_file);
PG_FUNCTION_INFO_V1(worker_apply_shard_ddl_command); PG_FUNCTION_INFO_V1(worker_apply_shard_ddl_command);
PG_FUNCTION_INFO_V1(worker_apply_inter_shard_ddl_command);
PG_FUNCTION_INFO_V1(worker_fetch_regular_table); PG_FUNCTION_INFO_V1(worker_fetch_regular_table);
PG_FUNCTION_INFO_V1(worker_fetch_foreign_file); PG_FUNCTION_INFO_V1(worker_fetch_foreign_file);
PG_FUNCTION_INFO_V1(worker_append_table_to_shard); PG_FUNCTION_INFO_V1(worker_append_table_to_shard);
@ -418,6 +419,36 @@ worker_apply_shard_ddl_command(PG_FUNCTION_ARGS)
} }
/*
* worker_apply_inter_shard_ddl_command extends table, index, or constraint names in
* the given DDL command. The function then applies this extended DDL command
* against the database.
*/
Datum
worker_apply_inter_shard_ddl_command(PG_FUNCTION_ARGS)
{
uint64 leftShardId = PG_GETARG_INT64(0);
text *leftShardSchemaNameText = PG_GETARG_TEXT_P(1);
uint64 rightShardId = PG_GETARG_INT64(2);
text *rightShardSchemaNameText = PG_GETARG_TEXT_P(3);
text *ddlCommandText = PG_GETARG_TEXT_P(4);
char *leftShardSchemaName = text_to_cstring(leftShardSchemaNameText);
char *rightShardSchemaName = text_to_cstring(rightShardSchemaNameText);
const char *ddlCommand = text_to_cstring(ddlCommandText);
Node *ddlCommandNode = ParseTreeNode(ddlCommand);
/* extend names in ddl command and apply extended command */
RelayEventExtendNamesForInterShardCommands(ddlCommandNode, leftShardId,
leftShardSchemaName, rightShardId,
rightShardSchemaName);
ProcessUtility(ddlCommandNode, ddlCommand, PROCESS_UTILITY_TOPLEVEL, NULL,
None_Receiver, NULL);
PG_RETURN_VOID();
}
/* /*
* worker_fetch_regular_table caches the given PostgreSQL table on the local * worker_fetch_regular_table caches the given PostgreSQL table on the local
* node. The function caches this table by trying the given list of node names * node. The function caches this table by trying the given list of node names

View File

@ -24,6 +24,6 @@ extern bool ShardsColocated(ShardInterval *leftShardInterval,
extern List * ColocatedTableList(Oid distributedTableId); extern List * ColocatedTableList(Oid distributedTableId);
extern List * ColocatedShardIntervalList(ShardInterval *shardInterval); extern List * ColocatedShardIntervalList(ShardInterval *shardInterval);
extern Oid ColocatedTableId(Oid colocationId); extern Oid ColocatedTableId(Oid colocationId);
extern uint64 ColocatedShardIdInRelation(Oid relationId, int shardIndex);
#endif /* COLOCATION_UTILS_H_ */ #endif /* COLOCATION_UTILS_H_ */

View File

@ -58,6 +58,9 @@
"SELECT worker_apply_shard_ddl_command (" UINT64_FORMAT ", %s)" "SELECT worker_apply_shard_ddl_command (" UINT64_FORMAT ", %s)"
#define WORKER_APPEND_TABLE_TO_SHARD \ #define WORKER_APPEND_TABLE_TO_SHARD \
"SELECT worker_append_table_to_shard (%s, %s, %s, %u)" "SELECT worker_append_table_to_shard (%s, %s, %s, %u)"
#define WORKER_APPLY_INTER_SHARD_DDL_COMMAND \
"SELECT worker_apply_inter_shard_ddl_command (" UINT64_FORMAT ", %s, " UINT64_FORMAT \
", %s, %s)"
#define SHARD_RANGE_QUERY "SELECT min(%s), max(%s) FROM %s" #define SHARD_RANGE_QUERY "SELECT min(%s), max(%s) FROM %s"
#define SHARD_TABLE_SIZE_QUERY "SELECT pg_table_size(%s)" #define SHARD_TABLE_SIZE_QUERY "SELECT pg_table_size(%s)"
#define SHARD_CSTORE_TABLE_SIZE_QUERY "SELECT cstore_table_size(%s)" #define SHARD_CSTORE_TABLE_SIZE_QUERY "SELECT cstore_table_size(%s)"
@ -93,6 +96,7 @@ extern bool CStoreTable(Oid relationId);
extern uint64 GetNextShardId(void); extern uint64 GetNextShardId(void);
extern Oid ResolveRelationId(text *relationName); extern Oid ResolveRelationId(text *relationName);
extern List * GetTableDDLEvents(Oid relationId); extern List * GetTableDDLEvents(Oid relationId);
extern List * GetTableForeignConstraintCommands(Oid relationId);
extern char ShardStorageType(Oid relationId); extern char ShardStorageType(Oid relationId);
extern void CheckDistributedTable(Oid relationId); extern void CheckDistributedTable(Oid relationId);
extern void CreateShardPlacements(Oid relationId, int64 shardId, List *ddlEventList, extern void CreateShardPlacements(Oid relationId, int64 shardId, List *ddlEventList,
@ -103,7 +107,9 @@ extern void CreateShardsWithRoundRobinPolicy(Oid distributedTableId, int32 shard
int32 replicationFactor); int32 replicationFactor);
extern void CreateColocatedShards(Oid targetRelationId, Oid sourceRelationId); extern void CreateColocatedShards(Oid targetRelationId, Oid sourceRelationId);
extern bool WorkerCreateShard(Oid relationId, char *nodeName, uint32 nodePort, extern bool WorkerCreateShard(Oid relationId, char *nodeName, uint32 nodePort,
uint64 shardId, char *newShardOwner, List *ddlCommandList); int shardIndex, uint64 shardId, char *newShardOwner,
List *ddlCommandList, List *foreignConstraintCommadList);
extern Oid ForeignConstraintGetReferencedTableId(char *queryString);
/* Function declarations for generating metadata for shard and placement creation */ /* Function declarations for generating metadata for shard and placement creation */
extern Datum master_get_table_metadata(PG_FUNCTION_ARGS); extern Datum master_get_table_metadata(PG_FUNCTION_ARGS);

View File

@ -42,6 +42,11 @@ typedef enum
/* Function declarations to extend names in DDL commands */ /* Function declarations to extend names in DDL commands */
extern void RelayEventExtendNames(Node *parseTree, char *schemaName, uint64 shardId); extern void RelayEventExtendNames(Node *parseTree, char *schemaName, uint64 shardId);
extern void RelayEventExtendNamesForInterShardCommands(Node *parseTree,
uint64 leftShardId,
char *leftShardSchemaName,
uint64 rightShardId,
char *rightShardSchemaName);
extern void AppendShardIdToName(char **name, uint64 shardId); extern void AppendShardIdToName(char **name, uint64 shardId);
#endif /* RELAY_UTILITY_H */ #endif /* RELAY_UTILITY_H */

View File

@ -32,5 +32,6 @@ extern ShardInterval * FindShardInterval(Datum partitionColumnValue,
int shardCount, char partitionMethod, int shardCount, char partitionMethod,
FmgrInfo *compareFunction, FmgrInfo *compareFunction,
FmgrInfo *hashFunction, bool useBinarySearch); FmgrInfo *hashFunction, bool useBinarySearch);
extern bool SingleReplicatedTable(Oid relationId);
#endif /* SHARDINTERVAL_UTILS_H_ */ #endif /* SHARDINTERVAL_UTILS_H_ */

View File

@ -38,6 +38,7 @@ ALTER EXTENSION citus UPDATE TO '6.0-9';
ALTER EXTENSION citus UPDATE TO '6.0-10'; ALTER EXTENSION citus UPDATE TO '6.0-10';
ALTER EXTENSION citus UPDATE TO '6.0-11'; ALTER EXTENSION citus UPDATE TO '6.0-11';
ALTER EXTENSION citus UPDATE TO '6.0-12'; ALTER EXTENSION citus UPDATE TO '6.0-12';
ALTER EXTENSION citus UPDATE TO '6.0-13';
-- drop extension an re-create in newest version -- drop extension an re-create in newest version
DROP EXTENSION citus; DROP EXTENSION citus;
\c \c

View File

@ -0,0 +1,341 @@
--
-- MULTI_FOREIGN_KEY
--
ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 1350000;
ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 1350000;
-- set shard_count to 4 for faster tests, because we create/drop lots of shards in this test.
SET citus.shard_count TO 4;
-- create tables
CREATE TABLE referenced_table(id int UNIQUE, test_column int, PRIMARY KEY(id, test_column));
SELECT create_distributed_table('referenced_table', 'id', 'hash');
create_distributed_table
--------------------------
(1 row)
-- test foreign constraint creation with not supported parameters
CREATE TABLE referencing_table(id int, ref_id int, FOREIGN KEY(ref_id) REFERENCES referenced_table(id) ON DELETE SET NULL);
SELECT create_distributed_table('referencing_table', 'ref_id', 'hash');
ERROR: cannot create foreign key constraint
DETAIL: SET NULL or SET DEFAULT is not supported in ON DELETE operation.
DROP TABLE referencing_table;
CREATE TABLE referencing_table(id int, ref_id int, FOREIGN KEY(ref_id) REFERENCES referenced_table(id) ON DELETE SET DEFAULT);
SELECT create_distributed_table('referencing_table', 'ref_id', 'hash');
ERROR: cannot create foreign key constraint
DETAIL: SET NULL or SET DEFAULT is not supported in ON DELETE operation.
DROP TABLE referencing_table;
CREATE TABLE referencing_table(id int, ref_id int, FOREIGN KEY(ref_id) REFERENCES referenced_table(id) ON UPDATE SET NULL);
SELECT create_distributed_table('referencing_table', 'ref_id', 'hash');
ERROR: cannot create foreign key constraint
DETAIL: SET NULL, SET DEFAULT or CASCADE is not supported in ON UPDATE operation.
DROP TABLE referencing_table;
CREATE TABLE referencing_table(id int, ref_id int, FOREIGN KEY(ref_id) REFERENCES referenced_table(id) ON UPDATE SET DEFAULT);
SELECT create_distributed_table('referencing_table', 'ref_id', 'hash');
ERROR: cannot create foreign key constraint
DETAIL: SET NULL, SET DEFAULT or CASCADE is not supported in ON UPDATE operation.
DROP TABLE referencing_table;
CREATE TABLE referencing_table(id int, ref_id int, FOREIGN KEY(ref_id) REFERENCES referenced_table(id) ON UPDATE CASCADE);
SELECT create_distributed_table('referencing_table', 'ref_id', 'hash');
ERROR: cannot create foreign key constraint
DETAIL: SET NULL, SET DEFAULT or CASCADE is not supported in ON UPDATE operation.
DROP TABLE referencing_table;
-- test foreign constraint creation on NOT co-located tables
SET citus.shard_count TO 8;
CREATE TABLE referencing_table(id int, ref_id int, FOREIGN KEY(ref_id) REFERENCES referenced_table(id));
SELECT create_distributed_table('referencing_table', 'ref_id', 'hash');
ERROR: cannot create foreign key constraint
DETAIL: Foreign key constraint can only be created on co-located tables.
DROP TABLE referencing_table;
SET citus.shard_count TO 4;
-- test foreign constraint creation on non-partition columns
CREATE TABLE referencing_table(id int, ref_id int, FOREIGN KEY(id) REFERENCES referenced_table(id));
SELECT create_distributed_table('referencing_table', 'ref_id', 'hash');
ERROR: cannot create foreign key constraint
DETAIL: Partition column must exist both referencing and referenced side of the foreign constraint statement and it must be in the same ordinal in both sides.
DROP TABLE referencing_table;
-- test foreign constraint creation while column list are in incorrect order
CREATE TABLE referencing_table(id int, ref_id int, FOREIGN KEY(id, ref_id) REFERENCES referenced_table(id, test_column));
SELECT create_distributed_table('referencing_table', 'ref_id', 'hash');
ERROR: cannot create foreign key constraint
DETAIL: Partition column must exist both referencing and referenced side of the foreign constraint statement and it must be in the same ordinal in both sides.
DROP TABLE referencing_table;
-- test foreign constraint with replication factor > 1
CREATE TABLE referencing_table(id int, ref_id int, FOREIGN KEY(ref_id) REFERENCES referenced_table(id));
SELECT create_distributed_table('referencing_table', 'ref_id', 'hash');
ERROR: cannot create foreign key constraint
DETAIL: Citus cannot create foreign key constrains if replication factor is greater than 1. Contact Citus Data for alternative deployment options.
DROP TABLE referencing_table;
DROP TABLE referenced_table;
-- test foreign constraint with correct conditions
SET citus.shard_replication_factor TO 1;
CREATE TABLE referenced_table(id int UNIQUE, test_column int, PRIMARY KEY(id, test_column));
CREATE TABLE referencing_table(id int, ref_id int, FOREIGN KEY(ref_id) REFERENCES referenced_table(id));
SELECT create_distributed_table('referenced_table', 'id', 'hash');
create_distributed_table
--------------------------
(1 row)
SELECT create_distributed_table('referencing_table', 'ref_id', 'hash');
create_distributed_table
--------------------------
(1 row)
-- test inserts
-- test insert to referencing table while there is NO corresponding value in referenced table
INSERT INTO referencing_table VALUES(1, 1);
ERROR: insert or update on table "referencing_table_1350008" violates foreign key constraint "referencing_table_ref_id_fkey_1350008"
DETAIL: Key (ref_id)=(1) is not present in table "referenced_table_1350004".
CONTEXT: while executing command on localhost:57637
-- test insert to referencing while there is corresponding value in referenced table
INSERT INTO referenced_table VALUES(1, 1);
INSERT INTO referencing_table VALUES(1, 1);
-- test deletes
-- test delete from referenced table while there is corresponding value in referencing table
DELETE FROM referenced_table WHERE id = 1;
ERROR: update or delete on table "referenced_table_1350004" violates foreign key constraint "referencing_table_ref_id_fkey_1350008" on table "referencing_table_1350008"
DETAIL: Key (id)=(1) is still referenced from table "referencing_table_1350008".
CONTEXT: while executing command on localhost:57637
-- test delete from referenced table while there is NO corresponding value in referencing table
DELETE FROM referencing_table WHERE ref_id = 1;
DELETE FROM referenced_table WHERE id = 1;
-- drop table for next tests
DROP TABLE referencing_table;
DROP TABLE referenced_table;
-- test foreign constraint options
-- test ON DELETE CASCADE
CREATE TABLE referenced_table(id int UNIQUE, test_column int, PRIMARY KEY(id, test_column));
CREATE TABLE referencing_table(id int, ref_id int, FOREIGN KEY(ref_id) REFERENCES referenced_table(id) ON DELETE CASCADE);
SELECT create_distributed_table('referenced_table', 'id', 'hash');
create_distributed_table
--------------------------
(1 row)
SELECT create_distributed_table('referencing_table', 'ref_id', 'hash');
create_distributed_table
--------------------------
(1 row)
INSERT INTO referenced_table VALUES(1, 1);
INSERT INTO referencing_table VALUES(1, 1);
DELETE FROM referenced_table WHERE id = 1;
SELECT * FROM referencing_table;
id | ref_id
----+--------
(0 rows)
SELECT * FROM referenced_table;
id | test_column
----+-------------
(0 rows)
DROP TABLE referencing_table;
DROP TABLE referenced_table;
-- test ON DELETE NO ACTION + DEFERABLE + INITIALLY DEFERRED
CREATE TABLE referenced_table(id int UNIQUE, test_column int, PRIMARY KEY(id, test_column));
CREATE TABLE referencing_table(id int, ref_id int, FOREIGN KEY(ref_id) REFERENCES referenced_table(id) ON DELETE NO ACTION DEFERRABLE INITIALLY DEFERRED);
SELECT create_distributed_table('referenced_table', 'id', 'hash');
create_distributed_table
--------------------------
(1 row)
SELECT create_distributed_table('referencing_table', 'ref_id', 'hash');
create_distributed_table
--------------------------
(1 row)
INSERT INTO referenced_table VALUES(1, 1);
INSERT INTO referencing_table VALUES(1, 1);
DELETE FROM referenced_table WHERE id = 1;
ERROR: update or delete on table "referenced_table_1350020" violates foreign key constraint "referencing_table_ref_id_fkey_1350024" on table "referencing_table_1350024"
DETAIL: Key (id)=(1) is still referenced from table "referencing_table_1350024".
CONTEXT: while executing command on localhost:57637
BEGIN;
DELETE FROM referenced_table WHERE id = 1;
DELETE FROM referencing_table WHERE ref_id = 1;
COMMIT;
SELECT * FROM referencing_table;
id | ref_id
----+--------
(0 rows)
SELECT * FROM referenced_table;
id | test_column
----+-------------
(0 rows)
DROP TABLE referencing_table;
DROP TABLE referenced_table;
-- test ON DELETE RESTRICT
CREATE TABLE referenced_table(id int UNIQUE, test_column int, PRIMARY KEY(id, test_column));
CREATE TABLE referencing_table(id int, ref_id int, FOREIGN KEY(ref_id) REFERENCES referenced_table(id) ON DELETE RESTRICT);
SELECT create_distributed_table('referenced_table', 'id', 'hash');
create_distributed_table
--------------------------
(1 row)
SELECT create_distributed_table('referencing_table', 'ref_id', 'hash');
create_distributed_table
--------------------------
(1 row)
INSERT INTO referenced_table VALUES(1, 1);
INSERT INTO referencing_table VALUES(1, 1);
BEGIN;
DELETE FROM referenced_table WHERE id = 1;
ERROR: update or delete on table "referenced_table_1350028" violates foreign key constraint "referencing_table_ref_id_fkey_1350032" on table "referencing_table_1350032"
DETAIL: Key (id)=(1) is still referenced from table "referencing_table_1350032".
CONTEXT: while executing command on localhost:57637
DELETE FROM referencing_table WHERE ref_id = 1;
ERROR: current transaction is aborted, commands ignored until end of transaction block
COMMIT;
SELECT * FROM referencing_table;
id | ref_id
----+--------
1 | 1
(1 row)
SELECT * FROM referenced_table;
id | test_column
----+-------------
1 | 1
(1 row)
DROP TABLE referencing_table;
DROP TABLE referenced_table;
-- test ON UPDATE NO ACTION + DEFERABLE + INITIALLY DEFERRED
CREATE TABLE referenced_table(id int UNIQUE, test_column int, PRIMARY KEY(id, test_column));
CREATE TABLE referencing_table(id int, ref_id int, FOREIGN KEY(ref_id, id) REFERENCES referenced_table(id, test_column) ON UPDATE NO ACTION DEFERRABLE INITIALLY DEFERRED);
SELECT create_distributed_table('referenced_table', 'id', 'hash');
create_distributed_table
--------------------------
(1 row)
SELECT create_distributed_table('referencing_table', 'ref_id', 'hash');
create_distributed_table
--------------------------
(1 row)
INSERT INTO referenced_table VALUES(1, 1);
INSERT INTO referencing_table VALUES(1, 1);
UPDATE referenced_table SET test_column = 10 WHERE id = 1;
ERROR: update or delete on table "referenced_table_1350036" violates foreign key constraint "referencing_table_ref_id_fkey_1350040" on table "referencing_table_1350040"
DETAIL: Key (id, test_column)=(1, 1) is still referenced from table "referencing_table_1350040".
CONTEXT: while executing command on localhost:57637
BEGIN;
UPDATE referenced_table SET test_column = 10 WHERE id = 1;
UPDATE referencing_table SET id = 10 WHERE ref_id = 1;
COMMIT;
SELECT * FROM referencing_table;
id | ref_id
----+--------
10 | 1
(1 row)
SELECT * FROM referenced_table;
id | test_column
----+-------------
1 | 10
(1 row)
DROP TABLE referencing_table;
DROP TABLE referenced_table;
-- test ON UPDATE RESTRICT
CREATE TABLE referenced_table(id int UNIQUE, test_column int, PRIMARY KEY(id, test_column));
CREATE TABLE referencing_table(id int, ref_id int, FOREIGN KEY(ref_id, id) REFERENCES referenced_table(id, test_column) ON UPDATE RESTRICT);
SELECT create_distributed_table('referenced_table', 'id', 'hash');
create_distributed_table
--------------------------
(1 row)
SELECT create_distributed_table('referencing_table', 'ref_id', 'hash');
create_distributed_table
--------------------------
(1 row)
INSERT INTO referenced_table VALUES(1, 1);
INSERT INTO referencing_table VALUES(1, 1);
BEGIN;
UPDATE referenced_table SET test_column = 20 WHERE id = 1;
ERROR: update or delete on table "referenced_table_1350044" violates foreign key constraint "referencing_table_ref_id_fkey_1350048" on table "referencing_table_1350048"
DETAIL: Key (id, test_column)=(1, 1) is still referenced from table "referencing_table_1350048".
CONTEXT: while executing command on localhost:57637
UPDATE referencing_table SET id = 20 WHERE ref_id = 1;
ERROR: current transaction is aborted, commands ignored until end of transaction block
COMMIT;
SELECT * FROM referencing_table;
id | ref_id
----+--------
1 | 1
(1 row)
SELECT * FROM referenced_table;
id | test_column
----+-------------
1 | 1
(1 row)
DROP TABLE referencing_table;
DROP TABLE referenced_table;
-- test MATCH SIMPLE
CREATE TABLE referenced_table(id int UNIQUE, test_column int, PRIMARY KEY(id, test_column));
CREATE TABLE referencing_table(id int, ref_id int, FOREIGN KEY(ref_id, id) REFERENCES referenced_table(id, test_column) MATCH SIMPLE);
SELECT create_distributed_table('referenced_table', 'id', 'hash');
create_distributed_table
--------------------------
(1 row)
SELECT create_distributed_table('referencing_table', 'ref_id', 'hash');
create_distributed_table
--------------------------
(1 row)
INSERT INTO referencing_table VALUES(null, 2);
SELECT * FROM referencing_table;
id | ref_id
----+--------
| 2
(1 row)
DELETE FROM referencing_table WHERE ref_id = 2;
DROP TABLE referencing_table;
DROP TABLE referenced_table;
-- test MATCH FULL
CREATE TABLE referenced_table(id int UNIQUE, test_column int, PRIMARY KEY(id, test_column));
CREATE TABLE referencing_table(id int, ref_id int, FOREIGN KEY(ref_id, id) REFERENCES referenced_table(id, test_column) MATCH FULL);
SELECT create_distributed_table('referenced_table', 'id', 'hash');
create_distributed_table
--------------------------
(1 row)
SELECT create_distributed_table('referencing_table', 'ref_id', 'hash');
create_distributed_table
--------------------------
(1 row)
INSERT INTO referencing_table VALUES(null, 2);
ERROR: insert or update on table "referencing_table_1350067" violates foreign key constraint "referencing_table_ref_id_fkey_1350067"
DETAIL: MATCH FULL does not allow mixing of null and nonnull key values.
CONTEXT: while executing command on localhost:57638
SELECT * FROM referencing_table;
id | ref_id
----+--------
(0 rows)
DROP TABLE referencing_table;
DROP TABLE referenced_table;

View File

@ -188,3 +188,8 @@ test: multi_colocated_shard_transfer
# multi_citus_tools tests utility functions written for citus tools # multi_citus_tools tests utility functions written for citus tools
# ---------- # ----------
test: multi_citus_tools test: multi_citus_tools
# ----------
# multi_foreign_key tests foreign key push down on distributed tables
# ----------
test: multi_foreign_key

View File

@ -43,6 +43,7 @@ ALTER EXTENSION citus UPDATE TO '6.0-9';
ALTER EXTENSION citus UPDATE TO '6.0-10'; ALTER EXTENSION citus UPDATE TO '6.0-10';
ALTER EXTENSION citus UPDATE TO '6.0-11'; ALTER EXTENSION citus UPDATE TO '6.0-11';
ALTER EXTENSION citus UPDATE TO '6.0-12'; ALTER EXTENSION citus UPDATE TO '6.0-12';
ALTER EXTENSION citus UPDATE TO '6.0-13';
-- drop extension an re-create in newest version -- drop extension an re-create in newest version
DROP EXTENSION citus; DROP EXTENSION citus;

View File

@ -0,0 +1,187 @@
--
-- MULTI_FOREIGN_KEY
--
ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 1350000;
ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 1350000;
-- set shard_count to 4 for faster tests, because we create/drop lots of shards in this test.
SET citus.shard_count TO 4;
-- create tables
CREATE TABLE referenced_table(id int UNIQUE, test_column int, PRIMARY KEY(id, test_column));
SELECT create_distributed_table('referenced_table', 'id', 'hash');
-- test foreign constraint creation with not supported parameters
CREATE TABLE referencing_table(id int, ref_id int, FOREIGN KEY(ref_id) REFERENCES referenced_table(id) ON DELETE SET NULL);
SELECT create_distributed_table('referencing_table', 'ref_id', 'hash');
DROP TABLE referencing_table;
CREATE TABLE referencing_table(id int, ref_id int, FOREIGN KEY(ref_id) REFERENCES referenced_table(id) ON DELETE SET DEFAULT);
SELECT create_distributed_table('referencing_table', 'ref_id', 'hash');
DROP TABLE referencing_table;
CREATE TABLE referencing_table(id int, ref_id int, FOREIGN KEY(ref_id) REFERENCES referenced_table(id) ON UPDATE SET NULL);
SELECT create_distributed_table('referencing_table', 'ref_id', 'hash');
DROP TABLE referencing_table;
CREATE TABLE referencing_table(id int, ref_id int, FOREIGN KEY(ref_id) REFERENCES referenced_table(id) ON UPDATE SET DEFAULT);
SELECT create_distributed_table('referencing_table', 'ref_id', 'hash');
DROP TABLE referencing_table;
CREATE TABLE referencing_table(id int, ref_id int, FOREIGN KEY(ref_id) REFERENCES referenced_table(id) ON UPDATE CASCADE);
SELECT create_distributed_table('referencing_table', 'ref_id', 'hash');
DROP TABLE referencing_table;
-- test foreign constraint creation on NOT co-located tables
SET citus.shard_count TO 8;
CREATE TABLE referencing_table(id int, ref_id int, FOREIGN KEY(ref_id) REFERENCES referenced_table(id));
SELECT create_distributed_table('referencing_table', 'ref_id', 'hash');
DROP TABLE referencing_table;
SET citus.shard_count TO 4;
-- test foreign constraint creation on non-partition columns
CREATE TABLE referencing_table(id int, ref_id int, FOREIGN KEY(id) REFERENCES referenced_table(id));
SELECT create_distributed_table('referencing_table', 'ref_id', 'hash');
DROP TABLE referencing_table;
-- test foreign constraint creation while column list are in incorrect order
CREATE TABLE referencing_table(id int, ref_id int, FOREIGN KEY(id, ref_id) REFERENCES referenced_table(id, test_column));
SELECT create_distributed_table('referencing_table', 'ref_id', 'hash');
DROP TABLE referencing_table;
-- test foreign constraint with replication factor > 1
CREATE TABLE referencing_table(id int, ref_id int, FOREIGN KEY(ref_id) REFERENCES referenced_table(id));
SELECT create_distributed_table('referencing_table', 'ref_id', 'hash');
DROP TABLE referencing_table;
DROP TABLE referenced_table;
-- test foreign constraint with correct conditions
SET citus.shard_replication_factor TO 1;
CREATE TABLE referenced_table(id int UNIQUE, test_column int, PRIMARY KEY(id, test_column));
CREATE TABLE referencing_table(id int, ref_id int, FOREIGN KEY(ref_id) REFERENCES referenced_table(id));
SELECT create_distributed_table('referenced_table', 'id', 'hash');
SELECT create_distributed_table('referencing_table', 'ref_id', 'hash');
-- test inserts
-- test insert to referencing table while there is NO corresponding value in referenced table
INSERT INTO referencing_table VALUES(1, 1);
-- test insert to referencing while there is corresponding value in referenced table
INSERT INTO referenced_table VALUES(1, 1);
INSERT INTO referencing_table VALUES(1, 1);
-- test deletes
-- test delete from referenced table while there is corresponding value in referencing table
DELETE FROM referenced_table WHERE id = 1;
-- test delete from referenced table while there is NO corresponding value in referencing table
DELETE FROM referencing_table WHERE ref_id = 1;
DELETE FROM referenced_table WHERE id = 1;
-- drop table for next tests
DROP TABLE referencing_table;
DROP TABLE referenced_table;
-- test foreign constraint options
-- test ON DELETE CASCADE
CREATE TABLE referenced_table(id int UNIQUE, test_column int, PRIMARY KEY(id, test_column));
CREATE TABLE referencing_table(id int, ref_id int, FOREIGN KEY(ref_id) REFERENCES referenced_table(id) ON DELETE CASCADE);
SELECT create_distributed_table('referenced_table', 'id', 'hash');
SELECT create_distributed_table('referencing_table', 'ref_id', 'hash');
INSERT INTO referenced_table VALUES(1, 1);
INSERT INTO referencing_table VALUES(1, 1);
DELETE FROM referenced_table WHERE id = 1;
SELECT * FROM referencing_table;
SELECT * FROM referenced_table;
DROP TABLE referencing_table;
DROP TABLE referenced_table;
-- test ON DELETE NO ACTION + DEFERABLE + INITIALLY DEFERRED
CREATE TABLE referenced_table(id int UNIQUE, test_column int, PRIMARY KEY(id, test_column));
CREATE TABLE referencing_table(id int, ref_id int, FOREIGN KEY(ref_id) REFERENCES referenced_table(id) ON DELETE NO ACTION DEFERRABLE INITIALLY DEFERRED);
SELECT create_distributed_table('referenced_table', 'id', 'hash');
SELECT create_distributed_table('referencing_table', 'ref_id', 'hash');
INSERT INTO referenced_table VALUES(1, 1);
INSERT INTO referencing_table VALUES(1, 1);
DELETE FROM referenced_table WHERE id = 1;
BEGIN;
DELETE FROM referenced_table WHERE id = 1;
DELETE FROM referencing_table WHERE ref_id = 1;
COMMIT;
SELECT * FROM referencing_table;
SELECT * FROM referenced_table;
DROP TABLE referencing_table;
DROP TABLE referenced_table;
-- test ON DELETE RESTRICT
CREATE TABLE referenced_table(id int UNIQUE, test_column int, PRIMARY KEY(id, test_column));
CREATE TABLE referencing_table(id int, ref_id int, FOREIGN KEY(ref_id) REFERENCES referenced_table(id) ON DELETE RESTRICT);
SELECT create_distributed_table('referenced_table', 'id', 'hash');
SELECT create_distributed_table('referencing_table', 'ref_id', 'hash');
INSERT INTO referenced_table VALUES(1, 1);
INSERT INTO referencing_table VALUES(1, 1);
BEGIN;
DELETE FROM referenced_table WHERE id = 1;
DELETE FROM referencing_table WHERE ref_id = 1;
COMMIT;
SELECT * FROM referencing_table;
SELECT * FROM referenced_table;
DROP TABLE referencing_table;
DROP TABLE referenced_table;
-- test ON UPDATE NO ACTION + DEFERABLE + INITIALLY DEFERRED
CREATE TABLE referenced_table(id int UNIQUE, test_column int, PRIMARY KEY(id, test_column));
CREATE TABLE referencing_table(id int, ref_id int, FOREIGN KEY(ref_id, id) REFERENCES referenced_table(id, test_column) ON UPDATE NO ACTION DEFERRABLE INITIALLY DEFERRED);
SELECT create_distributed_table('referenced_table', 'id', 'hash');
SELECT create_distributed_table('referencing_table', 'ref_id', 'hash');
INSERT INTO referenced_table VALUES(1, 1);
INSERT INTO referencing_table VALUES(1, 1);
UPDATE referenced_table SET test_column = 10 WHERE id = 1;
BEGIN;
UPDATE referenced_table SET test_column = 10 WHERE id = 1;
UPDATE referencing_table SET id = 10 WHERE ref_id = 1;
COMMIT;
SELECT * FROM referencing_table;
SELECT * FROM referenced_table;
DROP TABLE referencing_table;
DROP TABLE referenced_table;
-- test ON UPDATE RESTRICT
CREATE TABLE referenced_table(id int UNIQUE, test_column int, PRIMARY KEY(id, test_column));
CREATE TABLE referencing_table(id int, ref_id int, FOREIGN KEY(ref_id, id) REFERENCES referenced_table(id, test_column) ON UPDATE RESTRICT);
SELECT create_distributed_table('referenced_table', 'id', 'hash');
SELECT create_distributed_table('referencing_table', 'ref_id', 'hash');
INSERT INTO referenced_table VALUES(1, 1);
INSERT INTO referencing_table VALUES(1, 1);
BEGIN;
UPDATE referenced_table SET test_column = 20 WHERE id = 1;
UPDATE referencing_table SET id = 20 WHERE ref_id = 1;
COMMIT;
SELECT * FROM referencing_table;
SELECT * FROM referenced_table;
DROP TABLE referencing_table;
DROP TABLE referenced_table;
-- test MATCH SIMPLE
CREATE TABLE referenced_table(id int UNIQUE, test_column int, PRIMARY KEY(id, test_column));
CREATE TABLE referencing_table(id int, ref_id int, FOREIGN KEY(ref_id, id) REFERENCES referenced_table(id, test_column) MATCH SIMPLE);
SELECT create_distributed_table('referenced_table', 'id', 'hash');
SELECT create_distributed_table('referencing_table', 'ref_id', 'hash');
INSERT INTO referencing_table VALUES(null, 2);
SELECT * FROM referencing_table;
DELETE FROM referencing_table WHERE ref_id = 2;
DROP TABLE referencing_table;
DROP TABLE referenced_table;
-- test MATCH FULL
CREATE TABLE referenced_table(id int UNIQUE, test_column int, PRIMARY KEY(id, test_column));
CREATE TABLE referencing_table(id int, ref_id int, FOREIGN KEY(ref_id, id) REFERENCES referenced_table(id, test_column) MATCH FULL);
SELECT create_distributed_table('referenced_table', 'id', 'hash');
SELECT create_distributed_table('referencing_table', 'ref_id', 'hash');
INSERT INTO referencing_table VALUES(null, 2);
SELECT * FROM referencing_table;
DROP TABLE referencing_table;
DROP TABLE referenced_table;