Delete unused colocation groups after mark_tables_colocated()

pull/1012/head
Metin Doslu 2016-12-13 13:53:20 +02:00
parent 18d8a72ca8
commit 41bb6fb3be
3 changed files with 153 additions and 6 deletions

View File

@ -42,6 +42,8 @@ static int CompareShardPlacementsByNode(const void *leftElement,
const void *rightElement);
static uint32 GetNextColocationId(void);
static void UpdateRelationColocationGroup(Oid distributedRelationId, uint32 colocationId);
static List * ColocationGroupTableList(Oid colocationId);
static void DeleteColocationGroup(uint32 colocationId);
/* exports for SQL callable functions */
@ -91,6 +93,7 @@ static void
MarkTablesColocated(Oid sourceRelationId, Oid targetRelationId)
{
uint32 sourceColocationId = INVALID_COLOCATION_ID;
uint32 targetColocationId = INVALID_COLOCATION_ID;
Relation pgDistColocation = NULL;
Var *sourceDistributionColumn = NULL;
Var *targetDistributionColumn = NULL;
@ -154,9 +157,23 @@ MarkTablesColocated(Oid sourceRelationId, Oid targetRelationId)
UpdateRelationColocationGroup(sourceRelationId, sourceColocationId);
}
targetColocationId = TableColocationId(targetRelationId);
/* finally set colocation group for the target relation */
UpdateRelationColocationGroup(targetRelationId, sourceColocationId);
/* if there is not any remaining table in the colocation group, delete it */
if (targetColocationId != INVALID_COLOCATION_ID)
{
List *colocatedTableList = ColocationGroupTableList(targetColocationId);
int colocatedTableCount = list_length(colocatedTableList);
if (colocatedTableCount == 0)
{
DeleteColocationGroup(targetColocationId);
}
}
heap_close(pgDistColocation, NoLock);
}
@ -622,6 +639,30 @@ ColocatedTableList(Oid distributedTableId)
uint32 tableColocationId = TableColocationId(distributedTableId);
List *colocatedTableList = NIL;
/*
* If distribution type of the table is not hash, the table is only co-located
* with itself.
*/
if (tableColocationId == INVALID_COLOCATION_ID)
{
colocatedTableList = lappend_oid(colocatedTableList, distributedTableId);
return colocatedTableList;
}
colocatedTableList = ColocationGroupTableList(tableColocationId);
return colocatedTableList;
}
/*
* ColocationGroupTableList returns the list of tables in the given colocation
* group. If the colocation group is INVALID_COLOCATION_ID, it returns NIL.
*/
static List *
ColocationGroupTableList(Oid colocationId)
{
List *colocatedTableList = NIL;
Relation pgDistPartition = NULL;
TupleDesc tupleDescriptor = NULL;
SysScanDesc scanDescriptor = NULL;
@ -634,14 +675,13 @@ ColocatedTableList(Oid distributedTableId)
* If distribution type of the table is not hash, the table is only co-located
* with itself.
*/
if (tableColocationId == INVALID_COLOCATION_ID)
if (colocationId == INVALID_COLOCATION_ID)
{
colocatedTableList = lappend_oid(colocatedTableList, distributedTableId);
return colocatedTableList;
return NIL;
}
ScanKeyInit(&scanKey[0], Anum_pg_dist_partition_colocationid,
BTEqualStrategyNumber, F_INT4EQ, ObjectIdGetDatum(tableColocationId));
BTEqualStrategyNumber, F_INT4EQ, ObjectIdGetDatum(colocationId));
pgDistPartition = heap_open(DistPartitionRelationId(), AccessShareLock);
tupleDescriptor = RelationGetDescr(pgDistPartition);
@ -780,3 +820,44 @@ ColocatedShardIdInRelation(Oid relationId, int shardIndex)
return tableCacheEntry->sortedShardIntervalArray[shardIndex]->shardId;
}
/*
* DeleteColocationGroup deletes the colocation group from pg_dist_colocation.
*/
static void
DeleteColocationGroup(uint32 colocationId)
{
Relation pgDistColocation = NULL;
SysScanDesc scanDescriptor = NULL;
int scanKeyCount = 1;
ScanKeyData scanKey[scanKeyCount];
bool indexOK = false;
HeapTuple heapTuple = NULL;
pgDistColocation = heap_open(DistColocationRelationId(), RowExclusiveLock);
ScanKeyInit(&scanKey[0], Anum_pg_dist_colocation_colocationid,
BTEqualStrategyNumber, F_INT4EQ, UInt32GetDatum(colocationId));
scanDescriptor = systable_beginscan(pgDistColocation, InvalidOid, indexOK,
NULL, scanKeyCount, scanKey);
heapTuple = systable_getnext(scanDescriptor);
if (!HeapTupleIsValid(heapTuple))
{
ereport(ERROR, (errmsg("could not find valid entry for colocation group %d",
colocationId)));
}
simple_heap_delete(pgDistColocation, &(heapTuple->t_self));
CatalogUpdateIndexes(pgDistColocation, heapTuple);
CitusInvalidateRelcacheByRelid(DistColocationRelationId());
systable_endscan(scanDescriptor);
heap_close(pgDistColocation, RowExclusiveLock);
/* increment the counter so that next command can see the row */
CommandCounterIncrement();
}

View File

@ -954,6 +954,14 @@ SELECT mark_tables_colocated('table1_groupB', ARRAY['table1_groupB']);
(1 row)
SET citus.shard_count = 2;
CREATE TABLE table5_groupE ( id int );
SELECT create_distributed_table('table5_groupE', 'id', colocate_with => 'NONE');
create_distributed_table
--------------------------
(1 row)
-- check metadata to see colocation groups are created successfully
SELECT * FROM pg_dist_colocation
WHERE colocationid >= 1 AND colocationid < 1000
@ -965,7 +973,8 @@ SELECT * FROM pg_dist_colocation
4 | 4 | 2 | 23 | t
5 | 2 | 2 | 23 | t
6 | 1 | 2 | 23 | t
(5 rows)
7 | 2 | 2 | 23 | f
(6 rows)
SELECT logicalrelid, colocationid FROM pg_dist_partition
WHERE colocationid >= 1 AND colocationid < 1000
@ -983,5 +992,45 @@ SELECT logicalrelid, colocationid FROM pg_dist_partition
table3_groupe | 5
table1_groupf | 6
table2_groupf | 6
(11 rows)
table5_groupe | 7
(12 rows)
-- move the only table in colocation group 7 to colocation group 5
SELECT mark_tables_colocated('table1_groupE', ARRAY['table5_groupE']);
mark_tables_colocated
-----------------------
(1 row)
-- check metadata to see that unused colocation group is deleted
SELECT * FROM pg_dist_colocation
WHERE colocationid >= 1 AND colocationid < 1000
ORDER BY colocationid;
colocationid | shardcount | replicationfactor | distributioncolumntype | defaultgroup
--------------+------------+-------------------+------------------------+--------------
2 | 2 | 1 | 23 | t
3 | 2 | 2 | 25 | t
4 | 4 | 2 | 23 | t
5 | 2 | 2 | 23 | t
6 | 1 | 2 | 23 | t
(5 rows)
SELECT logicalrelid, colocationid FROM pg_dist_partition
WHERE colocationid >= 1 AND colocationid < 1000
ORDER BY colocationid, logicalrelid;
logicalrelid | colocationid
---------------+--------------
table1_groupb | 2
table2_groupb | 2
table1_groupc | 3
table2_groupc | 3
table1_groupd | 4
table2_groupd | 4
table1_groupe | 5
table2_groupe | 5
table3_groupe | 5
table5_groupe | 5
table1_groupf | 6
table2_groupf | 6
(12 rows)

View File

@ -398,6 +398,11 @@ SELECT mark_tables_colocated('table1_groupF', ARRAY['table2_groupF']);
-- check to colocate with itself
SELECT mark_tables_colocated('table1_groupB', ARRAY['table1_groupB']);
SET citus.shard_count = 2;
CREATE TABLE table5_groupE ( id int );
SELECT create_distributed_table('table5_groupE', 'id', colocate_with => 'NONE');
-- check metadata to see colocation groups are created successfully
SELECT * FROM pg_dist_colocation
WHERE colocationid >= 1 AND colocationid < 1000
@ -406,3 +411,15 @@ SELECT * FROM pg_dist_colocation
SELECT logicalrelid, colocationid FROM pg_dist_partition
WHERE colocationid >= 1 AND colocationid < 1000
ORDER BY logicalrelid;
-- move the only table in colocation group 7 to colocation group 5
SELECT mark_tables_colocated('table1_groupE', ARRAY['table5_groupE']);
-- check metadata to see that unused colocation group is deleted
SELECT * FROM pg_dist_colocation
WHERE colocationid >= 1 AND colocationid < 1000
ORDER BY colocationid;
SELECT logicalrelid, colocationid FROM pg_dist_partition
WHERE colocationid >= 1 AND colocationid < 1000
ORDER BY colocationid, logicalrelid;