fix memory leak during distribution of a table with a lot of partitions (#6722)

We have memory leak during distribution of a table with a lot of
partitions as we do not release memory at ExprContext until all
partitions are not distributed. We improved 2 things to resolve the
issue:

1. We create and delete MemoryContext for each call to
`CreateDistributedTable` by partitions,
2. We rebuild the cache after we insert all the placements instead of
each placement for a shard.

DESCRIPTION: Fixes memory leak during distribution of a table with a lot
of partitions and shards.

Fixes https://github.com/citusdata/citus/issues/6572.
pull/6732/head
aykut-bozkurt 2023-02-17 18:12:49 +03:00 committed by GitHub
parent 756c1d3f5d
commit a7689c3f8d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 37 additions and 14 deletions

View File

@ -1116,12 +1116,27 @@ CreateDistributedTable(Oid relationId, char *distributionColumnName,
char *relationName = get_rel_name(relationId); char *relationName = get_rel_name(relationId);
char *parentRelationName = quote_qualified_identifier(schemaName, relationName); char *parentRelationName = quote_qualified_identifier(schemaName, relationName);
/*
* when there are many partitions, each call to CreateDistributedTable
* accumulates used memory. Create and free context for each call.
*/
MemoryContext citusPartitionContext =
AllocSetContextCreate(CurrentMemoryContext,
"citus_per_partition_context",
ALLOCSET_DEFAULT_SIZES);
MemoryContext oldContext = MemoryContextSwitchTo(citusPartitionContext);
foreach_oid(partitionRelationId, partitionList) foreach_oid(partitionRelationId, partitionList)
{ {
MemoryContextReset(citusPartitionContext);
CreateDistributedTable(partitionRelationId, distributionColumnName, CreateDistributedTable(partitionRelationId, distributionColumnName,
distributionMethod, shardCount, false, distributionMethod, shardCount, false,
parentRelationName); parentRelationName);
} }
MemoryContextSwitchTo(oldContext);
MemoryContextDelete(citusPartitionContext);
} }
/* copy over data for hash distributed and reference tables */ /* copy over data for hash distributed and reference tables */

View File

@ -215,6 +215,7 @@ CreateColocatedShards(Oid targetRelationId, Oid sourceRelationId, bool
{ {
bool colocatedShard = true; bool colocatedShard = true;
List *insertedShardPlacements = NIL; List *insertedShardPlacements = NIL;
List *insertedShardIds = NIL;
/* make sure that tables are hash partitioned */ /* make sure that tables are hash partitioned */
CheckHashPartitionedTable(targetRelationId); CheckHashPartitionedTable(targetRelationId);
@ -254,7 +255,9 @@ CreateColocatedShards(Oid targetRelationId, Oid sourceRelationId, bool
foreach_ptr(sourceShardInterval, sourceShardIntervalList) foreach_ptr(sourceShardInterval, sourceShardIntervalList)
{ {
uint64 sourceShardId = sourceShardInterval->shardId; uint64 sourceShardId = sourceShardInterval->shardId;
uint64 newShardId = GetNextShardId(); uint64 *newShardIdPtr = (uint64 *) palloc0(sizeof(uint64));
*newShardIdPtr = GetNextShardId();
insertedShardIds = lappend(insertedShardIds, newShardIdPtr);
int32 shardMinValue = DatumGetInt32(sourceShardInterval->minValue); int32 shardMinValue = DatumGetInt32(sourceShardInterval->minValue);
int32 shardMaxValue = DatumGetInt32(sourceShardInterval->maxValue); int32 shardMaxValue = DatumGetInt32(sourceShardInterval->maxValue);
@ -263,7 +266,7 @@ CreateColocatedShards(Oid targetRelationId, Oid sourceRelationId, bool
List *sourceShardPlacementList = ShardPlacementListSortedByWorker( List *sourceShardPlacementList = ShardPlacementListSortedByWorker(
sourceShardId); sourceShardId);
InsertShardRow(targetRelationId, newShardId, targetShardStorageType, InsertShardRow(targetRelationId, *newShardIdPtr, targetShardStorageType,
shardMinValueText, shardMaxValueText); shardMinValueText, shardMaxValueText);
ShardPlacement *sourcePlacement = NULL; ShardPlacement *sourcePlacement = NULL;
@ -272,21 +275,26 @@ CreateColocatedShards(Oid targetRelationId, Oid sourceRelationId, bool
int32 groupId = sourcePlacement->groupId; int32 groupId = sourcePlacement->groupId;
const uint64 shardSize = 0; const uint64 shardSize = 0;
/* InsertShardPlacementRow(*newShardIdPtr,
* Optimistically add shard placement row the pg_dist_shard_placement, in case
* of any error it will be roll-backed.
*/
uint64 shardPlacementId = InsertShardPlacementRow(newShardId,
INVALID_PLACEMENT_ID, INVALID_PLACEMENT_ID,
shardSize, shardSize,
groupId); groupId);
ShardPlacement *shardPlacement = LoadShardPlacement(newShardId,
shardPlacementId);
insertedShardPlacements = lappend(insertedShardPlacements, shardPlacement);
} }
} }
/*
* load shard placements for the shard at once after all placement insertions
* finished. That prevents MetadataCache from rebuilding unnecessarily after
* each placement insertion.
*/
uint64 *shardIdPtr;
foreach_ptr(shardIdPtr, insertedShardIds)
{
List *placementsForShard = ShardPlacementList(*shardIdPtr);
insertedShardPlacements = list_concat(insertedShardPlacements,
placementsForShard);
}
CreateShardsOnWorkers(targetRelationId, insertedShardPlacements, CreateShardsOnWorkers(targetRelationId, insertedShardPlacements,
useExclusiveConnections, colocatedShard); useExclusiveConnections, colocatedShard);
} }