Decrease memory usage with rebalancer

We decrease memory usage by:
- Freeing temporary buffers
- Using separate memory context for blocks that uses "small" amount of
memory but can be repeated many times such as loops
pull/4940/head
Sait Talha Nisanci 2021-04-14 13:57:49 +03:00
parent 2f90ce931b
commit 8cabd2e822
2 changed files with 29 additions and 0 deletions

View File

@ -700,6 +700,11 @@ CopyShardTables(List *shardIntervalList, char *sourceNodeName, int32 sourceNodeP
{ {
ShardInterval *shardInterval = NULL; ShardInterval *shardInterval = NULL;
MemoryContext localContext = AllocSetContextCreate(CurrentMemoryContext,
"CopyShardTables",
ALLOCSET_DEFAULT_SIZES);
MemoryContext oldContext = MemoryContextSwitchTo(localContext);
/* iterate through the colocated shards and copy each */ /* iterate through the colocated shards and copy each */
foreach_ptr(shardInterval, shardIntervalList) foreach_ptr(shardInterval, shardIntervalList)
{ {
@ -719,6 +724,8 @@ CopyShardTables(List *shardIntervalList, char *sourceNodeName, int32 sourceNodeP
tableOwner, ddlCommandList); tableOwner, ddlCommandList);
} }
MemoryContextReset(localContext);
/* /*
* Once all shards are created, we can recreate relationships between shards. * Once all shards are created, we can recreate relationships between shards.
@ -750,7 +757,10 @@ CopyShardTables(List *shardIntervalList, char *sourceNodeName, int32 sourceNodeP
SendCommandListToWorkerInSingleTransaction(targetNodeName, targetNodePort, SendCommandListToWorkerInSingleTransaction(targetNodeName, targetNodePort,
tableOwner, commandList); tableOwner, commandList);
MemoryContextReset(localContext);
} }
MemoryContextSwitchTo(oldContext);
} }

View File

@ -452,6 +452,11 @@ citus_shard_cost_by_disk_size(PG_FUNCTION_ARGS)
/* we skip child tables of a partitioned table if this boolean variable is true */ /* we skip child tables of a partitioned table if this boolean variable is true */
bool optimizePartitionCalculations = true; bool optimizePartitionCalculations = true;
MemoryContext localContext = AllocSetContextCreate(CurrentMemoryContext,
"CostByDiscSizeContext",
ALLOCSET_DEFAULT_SIZES);
MemoryContext oldContext = MemoryContextSwitchTo(localContext);
ShardInterval *shardInterval = LoadShardInterval(shardId); ShardInterval *shardInterval = LoadShardInterval(shardId);
List *colocatedShardList = ColocatedNonPartitionShardIntervalList(shardInterval); List *colocatedShardList = ColocatedNonPartitionShardIntervalList(shardInterval);
StringInfo tableSizeQuery = GenerateSizeQueryOnMultiplePlacements(colocatedShardList, StringInfo tableSizeQuery = GenerateSizeQueryOnMultiplePlacements(colocatedShardList,
@ -465,6 +470,7 @@ citus_shard_cost_by_disk_size(PG_FUNCTION_ARGS)
if (queryResult != RESPONSE_OKAY) if (queryResult != RESPONSE_OKAY)
{ {
MemoryContextSwitchTo(oldContext);
ereport(ERROR, (errcode(ERRCODE_CONNECTION_FAILURE), ereport(ERROR, (errcode(ERRCODE_CONNECTION_FAILURE),
errmsg("cannot get the size because of a connection error"))); errmsg("cannot get the size because of a connection error")));
} }
@ -477,12 +483,17 @@ citus_shard_cost_by_disk_size(PG_FUNCTION_ARGS)
list_length(sizeList)))); list_length(sizeList))));
} }
StringInfo tableSizeStringInfo = (StringInfo) linitial(sizeList); StringInfo tableSizeStringInfo = (StringInfo) linitial(sizeList);
char *tableSizeString = tableSizeStringInfo->data; char *tableSizeString = tableSizeStringInfo->data;
uint64 tableSize = SafeStringToUint64(tableSizeString); uint64 tableSize = SafeStringToUint64(tableSizeString);
MemoryContextSwitchTo(oldContext);
MemoryContextReset(localContext);
PQclear(result); PQclear(result);
ClearResults(connection, raiseErrors); ClearResults(connection, raiseErrors);
if (tableSize <= 0) if (tableSize <= 0)
{ {
PG_RETURN_FLOAT4(1); PG_RETURN_FLOAT4(1);
@ -601,6 +612,12 @@ ExecutePlacementUpdates(List *placementUpdateList, Oid shardReplicationModeOid,
char *noticeOperation) char *noticeOperation)
{ {
List *responsiveWorkerList = GetResponsiveWorkerList(); List *responsiveWorkerList = GetResponsiveWorkerList();
MemoryContext localContext = AllocSetContextCreate(CurrentMemoryContext,
"ExecutePlacementLoopContext",
ALLOCSET_DEFAULT_SIZES);
MemoryContext oldContext = MemoryContextSwitchTo(localContext);
ListCell *placementUpdateCell = NULL; ListCell *placementUpdateCell = NULL;
char shardReplicationMode = LookupShardTransferMode(shardReplicationModeOid); char shardReplicationMode = LookupShardTransferMode(shardReplicationModeOid);
@ -625,7 +642,9 @@ ExecutePlacementUpdates(List *placementUpdateList, Oid shardReplicationModeOid,
))); )));
UpdateShardPlacement(placementUpdate, responsiveWorkerList, UpdateShardPlacement(placementUpdate, responsiveWorkerList,
shardReplicationModeOid); shardReplicationModeOid);
MemoryContextReset(localContext);
} }
MemoryContextSwitchTo(oldContext);
} }