create_distributed_table in transaction is fixed

pull/2240/head
mehmet furkan şahin 2018-06-19 18:40:35 +03:00
parent 45f8017f42
commit 2c5d59f3a8
4 changed files with 102 additions and 21 deletions

View File

@ -100,6 +100,7 @@ static bool LocalTableEmpty(Oid tableId);
static void CopyLocalDataIntoShards(Oid relationId);
static List * TupleDescColumnNameList(TupleDesc tupleDescriptor);
static bool RelationUsesIdentityColumns(TupleDesc relationDesc);
static bool CanUseExclusiveConnections(Oid relationId, bool localTableEmpty);
/* exports for SQL callable functions */
PG_FUNCTION_INFO_V1(master_create_distributed_table);
@ -484,26 +485,8 @@ CreateHashDistributedTableShards(Oid relationId, Oid colocatedTableId,
*/
if (RegularTable(relationId))
{
if (!localTableEmpty && MultiShardConnectionType == SEQUENTIAL_CONNECTION)
{
char *relationName = get_rel_name(relationId);
ereport(ERROR, (errmsg("cannot distribute \"%s\" in sequential mode "
"because it is not empty", relationName),
errhint("If you have manually set "
"citus.multi_shard_modify_mode to 'sequential', "
"try with 'parallel' option. If that is not the "
"case, try distributing local tables when they "
"are empty.")));
}
else if (MultiShardConnectionType == SEQUENTIAL_CONNECTION)
{
useExclusiveConnection = false;
}
else if (!localTableEmpty || IsTransactionBlock())
{
useExclusiveConnection = true;
}
useExclusiveConnection = CanUseExclusiveConnections(relationId,
localTableEmpty);
}
if (colocatedTableId != InvalidOid)
@ -1107,6 +1090,44 @@ LocalTableEmpty(Oid tableId)
}
/*
* CanUseExclusiveConnections checks if we can open parallel connections
* while creating shards. We simply error out if we need to execute
* sequentially but there is data in the table, since we cannot copy the
* data to shards sequentially.
*/
static bool
CanUseExclusiveConnections(Oid relationId, bool localTableEmpty)
{
bool hasForeignKeyToReferenceTable = HasForeignKeyToReferenceTable(relationId);
bool shouldRunSequential = MultiShardConnectionType == SEQUENTIAL_CONNECTION ||
hasForeignKeyToReferenceTable;
if (!localTableEmpty && shouldRunSequential)
{
char *relationName = get_rel_name(relationId);
ereport(ERROR, (errmsg("cannot distribute \"%s\" in sequential mode "
"because it is not empty", relationName),
errhint("If you have manually set "
"citus.multi_shard_modify_mode to 'sequential', "
"try with 'parallel' option. If that is not the "
"case, try distributing local tables when they "
"are empty.")));
}
else if (shouldRunSequential)
{
return false;
}
else if (!localTableEmpty || IsTransactionBlock())
{
return true;
}
return false;
}
/*
* CreateTruncateTrigger creates a truncate trigger on table identified by relationId
* and assigns citus_truncate_trigger() as handler.

View File

@ -370,6 +370,62 @@ GetTableForeignConstraintCommands(Oid relationId)
}
/*
* HasForeignKeyToReferenceTable function scans the pgConstraint table to
* fetch all of the constraints on the given relationId and see if at least one
* of them is a foreign key referencing to a reference table.
*/
bool
HasForeignKeyToReferenceTable(Oid relationId)
{
Relation pgConstraint = NULL;
SysScanDesc scanDescriptor = NULL;
ScanKeyData scanKey[1];
int scanKeyCount = 1;
HeapTuple heapTuple = NULL;
bool hasForeignKeyToReferenceTable = false;
pgConstraint = heap_open(ConstraintRelationId, AccessShareLock);
ScanKeyInit(&scanKey[0], Anum_pg_constraint_conrelid, BTEqualStrategyNumber, F_OIDEQ,
relationId);
scanDescriptor = systable_beginscan(pgConstraint, ConstraintRelidIndexId, true, NULL,
scanKeyCount, scanKey);
heapTuple = systable_getnext(scanDescriptor);
while (HeapTupleIsValid(heapTuple))
{
Oid referencedTableId = InvalidOid;
Form_pg_constraint constraintForm = (Form_pg_constraint) GETSTRUCT(heapTuple);
if (constraintForm->contype != CONSTRAINT_FOREIGN)
{
heapTuple = systable_getnext(scanDescriptor);
continue;
}
referencedTableId = constraintForm->confrelid;
if (!IsDistributedTable(referencedTableId))
{
continue;
}
if (PartitionMethod(referencedTableId) == DISTRIBUTE_BY_NONE)
{
hasForeignKeyToReferenceTable = true;
break;
}
heapTuple = systable_getnext(scanDescriptor);
}
/* clean up scan and close system catalog */
systable_endscan(scanDescriptor);
heap_close(pgConstraint, NoLock);
return hasForeignKeyToReferenceTable;
}
/*
* TableReferenced function checks whether given table is referenced by another table
* via foreign constraints. If it is referenced, this function returns true. To check

View File

@ -662,7 +662,10 @@ WorkerCreateShard(Oid relationId, int shardIndex, uint64 shardId, List *ddlComma
* In case of self referencing shards, relation itself might not be distributed
* already. Therefore we cannot use ColocatedShardIdInRelation which assumes
* given relation is distributed. Besides, since we know foreign key references
* itself, referencedShardId is actual shardId anyway.
* itself, referencedShardId is actual shardId anyway. Also, if the referenced
* relation is a reference table, we cannot use ColocatedShardIdInRelation since
* reference tables only have one shard. Instead, we fetch the one and only shard
* from shardlist and use it.
*/
if (relationId == referencedRelationId)
{

View File

@ -19,6 +19,7 @@ extern void ErrorIfUnsupportedForeignConstraint(Relation relation, char
Var *distributionColumn, uint32
colocationId);
extern List * GetTableForeignConstraintCommands(Oid relationId);
extern bool HasForeignKeyToReferenceTable(Oid relationId);
extern bool TableReferenced(Oid relationId);
#endif