From 2c5d59f3a8ca147d4566d8bde42280ebf9c01d4e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?mehmet=20furkan=20=C5=9Fahin?= Date: Tue, 19 Jun 2018 18:40:35 +0300 Subject: [PATCH] create_distributed_table in transaction is fixed --- .../commands/create_distributed_table.c | 61 +++++++++++++------ .../distributed/ddl/foreign_constraint.c | 56 +++++++++++++++++ .../master/master_stage_protocol.c | 5 +- src/include/distributed/foreign_constraint.h | 1 + 4 files changed, 102 insertions(+), 21 deletions(-) diff --git a/src/backend/distributed/commands/create_distributed_table.c b/src/backend/distributed/commands/create_distributed_table.c index 4a4d7de93..b557cbdd4 100644 --- a/src/backend/distributed/commands/create_distributed_table.c +++ b/src/backend/distributed/commands/create_distributed_table.c @@ -100,6 +100,7 @@ static bool LocalTableEmpty(Oid tableId); static void CopyLocalDataIntoShards(Oid relationId); static List * TupleDescColumnNameList(TupleDesc tupleDescriptor); static bool RelationUsesIdentityColumns(TupleDesc relationDesc); +static bool CanUseExclusiveConnections(Oid relationId, bool localTableEmpty); /* exports for SQL callable functions */ PG_FUNCTION_INFO_V1(master_create_distributed_table); @@ -484,26 +485,8 @@ CreateHashDistributedTableShards(Oid relationId, Oid colocatedTableId, */ if (RegularTable(relationId)) { - if (!localTableEmpty && MultiShardConnectionType == SEQUENTIAL_CONNECTION) - { - char *relationName = get_rel_name(relationId); - - ereport(ERROR, (errmsg("cannot distribute \"%s\" in sequential mode " - "because it is not empty", relationName), - errhint("If you have manually set " - "citus.multi_shard_modify_mode to 'sequential', " - "try with 'parallel' option. If that is not the " - "case, try distributing local tables when they " - "are empty."))); - } - else if (MultiShardConnectionType == SEQUENTIAL_CONNECTION) - { - useExclusiveConnection = false; - } - else if (!localTableEmpty || IsTransactionBlock()) - { - useExclusiveConnection = true; - } + useExclusiveConnection = CanUseExclusiveConnections(relationId, + localTableEmpty); } if (colocatedTableId != InvalidOid) @@ -1107,6 +1090,44 @@ LocalTableEmpty(Oid tableId) } +/* + * CanUseExclusiveConnections checks if we can open parallel connections + * while creating shards. We simply error out if we need to execute + * sequentially but there is data in the table, since we cannot copy the + * data to shards sequentially. + */ +static bool +CanUseExclusiveConnections(Oid relationId, bool localTableEmpty) +{ + bool hasForeignKeyToReferenceTable = HasForeignKeyToReferenceTable(relationId); + bool shouldRunSequential = MultiShardConnectionType == SEQUENTIAL_CONNECTION || + hasForeignKeyToReferenceTable; + + if (!localTableEmpty && shouldRunSequential) + { + char *relationName = get_rel_name(relationId); + + ereport(ERROR, (errmsg("cannot distribute \"%s\" in sequential mode " + "because it is not empty", relationName), + errhint("If you have manually set " + "citus.multi_shard_modify_mode to 'sequential', " + "try with 'parallel' option. If that is not the " + "case, try distributing local tables when they " + "are empty."))); + } + else if (shouldRunSequential) + { + return false; + } + else if (!localTableEmpty || IsTransactionBlock()) + { + return true; + } + + return false; +} + + /* * CreateTruncateTrigger creates a truncate trigger on table identified by relationId * and assigns citus_truncate_trigger() as handler. diff --git a/src/backend/distributed/ddl/foreign_constraint.c b/src/backend/distributed/ddl/foreign_constraint.c index aa9248d22..84caa3b76 100644 --- a/src/backend/distributed/ddl/foreign_constraint.c +++ b/src/backend/distributed/ddl/foreign_constraint.c @@ -370,6 +370,62 @@ GetTableForeignConstraintCommands(Oid relationId) } +/* + * HasForeignKeyToReferenceTable function scans the pgConstraint table to + * fetch all of the constraints on the given relationId and see if at least one + * of them is a foreign key referencing to a reference table. + */ +bool +HasForeignKeyToReferenceTable(Oid relationId) +{ + Relation pgConstraint = NULL; + SysScanDesc scanDescriptor = NULL; + ScanKeyData scanKey[1]; + int scanKeyCount = 1; + HeapTuple heapTuple = NULL; + bool hasForeignKeyToReferenceTable = false; + + pgConstraint = heap_open(ConstraintRelationId, AccessShareLock); + ScanKeyInit(&scanKey[0], Anum_pg_constraint_conrelid, BTEqualStrategyNumber, F_OIDEQ, + relationId); + scanDescriptor = systable_beginscan(pgConstraint, ConstraintRelidIndexId, true, NULL, + scanKeyCount, scanKey); + + heapTuple = systable_getnext(scanDescriptor); + while (HeapTupleIsValid(heapTuple)) + { + Oid referencedTableId = InvalidOid; + Form_pg_constraint constraintForm = (Form_pg_constraint) GETSTRUCT(heapTuple); + + if (constraintForm->contype != CONSTRAINT_FOREIGN) + { + heapTuple = systable_getnext(scanDescriptor); + continue; + } + + referencedTableId = constraintForm->confrelid; + + if (!IsDistributedTable(referencedTableId)) + { + continue; + } + + if (PartitionMethod(referencedTableId) == DISTRIBUTE_BY_NONE) + { + hasForeignKeyToReferenceTable = true; + break; + } + + heapTuple = systable_getnext(scanDescriptor); + } + + /* clean up scan and close system catalog */ + systable_endscan(scanDescriptor); + heap_close(pgConstraint, NoLock); + return hasForeignKeyToReferenceTable; +} + + /* * TableReferenced function checks whether given table is referenced by another table * via foreign constraints. If it is referenced, this function returns true. To check diff --git a/src/backend/distributed/master/master_stage_protocol.c b/src/backend/distributed/master/master_stage_protocol.c index 5b3239d62..6129931cb 100644 --- a/src/backend/distributed/master/master_stage_protocol.c +++ b/src/backend/distributed/master/master_stage_protocol.c @@ -662,7 +662,10 @@ WorkerCreateShard(Oid relationId, int shardIndex, uint64 shardId, List *ddlComma * In case of self referencing shards, relation itself might not be distributed * already. Therefore we cannot use ColocatedShardIdInRelation which assumes * given relation is distributed. Besides, since we know foreign key references - * itself, referencedShardId is actual shardId anyway. + * itself, referencedShardId is actual shardId anyway. Also, if the referenced + * relation is a reference table, we cannot use ColocatedShardIdInRelation since + * reference tables only have one shard. Instead, we fetch the one and only shard + * from shardlist and use it. */ if (relationId == referencedRelationId) { diff --git a/src/include/distributed/foreign_constraint.h b/src/include/distributed/foreign_constraint.h index 5261f46c6..b85a6891e 100644 --- a/src/include/distributed/foreign_constraint.h +++ b/src/include/distributed/foreign_constraint.h @@ -19,6 +19,7 @@ extern void ErrorIfUnsupportedForeignConstraint(Relation relation, char Var *distributionColumn, uint32 colocationId); extern List * GetTableForeignConstraintCommands(Oid relationId); +extern bool HasForeignKeyToReferenceTable(Oid relationId); extern bool TableReferenced(Oid relationId); #endif