From a7689c3f8de41f1157f707f41da6478b70ed7e36 Mon Sep 17 00:00:00 2001 From: aykut-bozkurt <51649454+aykut-bozkurt@users.noreply.github.com> Date: Fri, 17 Feb 2023 18:12:49 +0300 Subject: [PATCH] fix memory leak during distribution of a table with a lot of partitions (#6722) We have memory leak during distribution of a table with a lot of partitions as we do not release memory at ExprContext until all partitions are not distributed. We improved 2 things to resolve the issue: 1. We create and delete MemoryContext for each call to `CreateDistributedTable` by partitions, 2. We rebuild the cache after we insert all the placements instead of each placement for a shard. DESCRIPTION: Fixes memory leak during distribution of a table with a lot of partitions and shards. Fixes https://github.com/citusdata/citus/issues/6572. --- .../commands/create_distributed_table.c | 15 ++++++++ .../distributed/operations/create_shards.c | 36 +++++++++++-------- 2 files changed, 37 insertions(+), 14 deletions(-) diff --git a/src/backend/distributed/commands/create_distributed_table.c b/src/backend/distributed/commands/create_distributed_table.c index 0bea11034..86133322d 100644 --- a/src/backend/distributed/commands/create_distributed_table.c +++ b/src/backend/distributed/commands/create_distributed_table.c @@ -1116,12 +1116,27 @@ CreateDistributedTable(Oid relationId, char *distributionColumnName, char *relationName = get_rel_name(relationId); char *parentRelationName = quote_qualified_identifier(schemaName, relationName); + /* + * when there are many partitions, each call to CreateDistributedTable + * accumulates used memory. Create and free context for each call. + */ + MemoryContext citusPartitionContext = + AllocSetContextCreate(CurrentMemoryContext, + "citus_per_partition_context", + ALLOCSET_DEFAULT_SIZES); + MemoryContext oldContext = MemoryContextSwitchTo(citusPartitionContext); + foreach_oid(partitionRelationId, partitionList) { + MemoryContextReset(citusPartitionContext); + CreateDistributedTable(partitionRelationId, distributionColumnName, distributionMethod, shardCount, false, parentRelationName); } + + MemoryContextSwitchTo(oldContext); + MemoryContextDelete(citusPartitionContext); } /* copy over data for hash distributed and reference tables */ diff --git a/src/backend/distributed/operations/create_shards.c b/src/backend/distributed/operations/create_shards.c index 282993d7b..3edab94e9 100644 --- a/src/backend/distributed/operations/create_shards.c +++ b/src/backend/distributed/operations/create_shards.c @@ -215,6 +215,7 @@ CreateColocatedShards(Oid targetRelationId, Oid sourceRelationId, bool { bool colocatedShard = true; List *insertedShardPlacements = NIL; + List *insertedShardIds = NIL; /* make sure that tables are hash partitioned */ CheckHashPartitionedTable(targetRelationId); @@ -254,7 +255,9 @@ CreateColocatedShards(Oid targetRelationId, Oid sourceRelationId, bool foreach_ptr(sourceShardInterval, sourceShardIntervalList) { uint64 sourceShardId = sourceShardInterval->shardId; - uint64 newShardId = GetNextShardId(); + uint64 *newShardIdPtr = (uint64 *) palloc0(sizeof(uint64)); + *newShardIdPtr = GetNextShardId(); + insertedShardIds = lappend(insertedShardIds, newShardIdPtr); int32 shardMinValue = DatumGetInt32(sourceShardInterval->minValue); int32 shardMaxValue = DatumGetInt32(sourceShardInterval->maxValue); @@ -263,7 +266,7 @@ CreateColocatedShards(Oid targetRelationId, Oid sourceRelationId, bool List *sourceShardPlacementList = ShardPlacementListSortedByWorker( sourceShardId); - InsertShardRow(targetRelationId, newShardId, targetShardStorageType, + InsertShardRow(targetRelationId, *newShardIdPtr, targetShardStorageType, shardMinValueText, shardMaxValueText); ShardPlacement *sourcePlacement = NULL; @@ -272,21 +275,26 @@ CreateColocatedShards(Oid targetRelationId, Oid sourceRelationId, bool int32 groupId = sourcePlacement->groupId; const uint64 shardSize = 0; - /* - * Optimistically add shard placement row the pg_dist_shard_placement, in case - * of any error it will be roll-backed. - */ - uint64 shardPlacementId = InsertShardPlacementRow(newShardId, - INVALID_PLACEMENT_ID, - shardSize, - groupId); - - ShardPlacement *shardPlacement = LoadShardPlacement(newShardId, - shardPlacementId); - insertedShardPlacements = lappend(insertedShardPlacements, shardPlacement); + InsertShardPlacementRow(*newShardIdPtr, + INVALID_PLACEMENT_ID, + shardSize, + groupId); } } + /* + * load shard placements for the shard at once after all placement insertions + * finished. That prevents MetadataCache from rebuilding unnecessarily after + * each placement insertion. + */ + uint64 *shardIdPtr; + foreach_ptr(shardIdPtr, insertedShardIds) + { + List *placementsForShard = ShardPlacementList(*shardIdPtr); + insertedShardPlacements = list_concat(insertedShardPlacements, + placementsForShard); + } + CreateShardsOnWorkers(targetRelationId, insertedShardPlacements, useExclusiveConnections, colocatedShard); }