mirror of https://github.com/citusdata/citus.git
Merge pull request #4940 from citusdata/reduce_memory_usage_rebalancer
Decrease memory usage with rebalancerupdate_ci_images
commit
ca5d281784
|
@ -700,6 +700,11 @@ CopyShardTables(List *shardIntervalList, char *sourceNodeName, int32 sourceNodeP
|
||||||
{
|
{
|
||||||
ShardInterval *shardInterval = NULL;
|
ShardInterval *shardInterval = NULL;
|
||||||
|
|
||||||
|
MemoryContext localContext = AllocSetContextCreate(CurrentMemoryContext,
|
||||||
|
"CopyShardTables",
|
||||||
|
ALLOCSET_DEFAULT_SIZES);
|
||||||
|
MemoryContext oldContext = MemoryContextSwitchTo(localContext);
|
||||||
|
|
||||||
/* iterate through the colocated shards and copy each */
|
/* iterate through the colocated shards and copy each */
|
||||||
foreach_ptr(shardInterval, shardIntervalList)
|
foreach_ptr(shardInterval, shardIntervalList)
|
||||||
{
|
{
|
||||||
|
@ -719,6 +724,8 @@ CopyShardTables(List *shardIntervalList, char *sourceNodeName, int32 sourceNodeP
|
||||||
tableOwner, ddlCommandList);
|
tableOwner, ddlCommandList);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
MemoryContextReset(localContext);
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Once all shards are created, we can recreate relationships between shards.
|
* Once all shards are created, we can recreate relationships between shards.
|
||||||
|
@ -750,7 +757,10 @@ CopyShardTables(List *shardIntervalList, char *sourceNodeName, int32 sourceNodeP
|
||||||
|
|
||||||
SendCommandListToWorkerInSingleTransaction(targetNodeName, targetNodePort,
|
SendCommandListToWorkerInSingleTransaction(targetNodeName, targetNodePort,
|
||||||
tableOwner, commandList);
|
tableOwner, commandList);
|
||||||
|
MemoryContextReset(localContext);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
MemoryContextSwitchTo(oldContext);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -452,6 +452,11 @@ citus_shard_cost_by_disk_size(PG_FUNCTION_ARGS)
|
||||||
|
|
||||||
/* we skip child tables of a partitioned table if this boolean variable is true */
|
/* we skip child tables of a partitioned table if this boolean variable is true */
|
||||||
bool optimizePartitionCalculations = true;
|
bool optimizePartitionCalculations = true;
|
||||||
|
|
||||||
|
MemoryContext localContext = AllocSetContextCreate(CurrentMemoryContext,
|
||||||
|
"CostByDiscSizeContext",
|
||||||
|
ALLOCSET_DEFAULT_SIZES);
|
||||||
|
MemoryContext oldContext = MemoryContextSwitchTo(localContext);
|
||||||
ShardInterval *shardInterval = LoadShardInterval(shardId);
|
ShardInterval *shardInterval = LoadShardInterval(shardId);
|
||||||
List *colocatedShardList = ColocatedNonPartitionShardIntervalList(shardInterval);
|
List *colocatedShardList = ColocatedNonPartitionShardIntervalList(shardInterval);
|
||||||
StringInfo tableSizeQuery = GenerateSizeQueryOnMultiplePlacements(colocatedShardList,
|
StringInfo tableSizeQuery = GenerateSizeQueryOnMultiplePlacements(colocatedShardList,
|
||||||
|
@ -465,6 +470,7 @@ citus_shard_cost_by_disk_size(PG_FUNCTION_ARGS)
|
||||||
|
|
||||||
if (queryResult != RESPONSE_OKAY)
|
if (queryResult != RESPONSE_OKAY)
|
||||||
{
|
{
|
||||||
|
MemoryContextSwitchTo(oldContext);
|
||||||
ereport(ERROR, (errcode(ERRCODE_CONNECTION_FAILURE),
|
ereport(ERROR, (errcode(ERRCODE_CONNECTION_FAILURE),
|
||||||
errmsg("cannot get the size because of a connection error")));
|
errmsg("cannot get the size because of a connection error")));
|
||||||
}
|
}
|
||||||
|
@ -477,12 +483,17 @@ citus_shard_cost_by_disk_size(PG_FUNCTION_ARGS)
|
||||||
list_length(sizeList))));
|
list_length(sizeList))));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
StringInfo tableSizeStringInfo = (StringInfo) linitial(sizeList);
|
StringInfo tableSizeStringInfo = (StringInfo) linitial(sizeList);
|
||||||
char *tableSizeString = tableSizeStringInfo->data;
|
char *tableSizeString = tableSizeStringInfo->data;
|
||||||
uint64 tableSize = SafeStringToUint64(tableSizeString);
|
uint64 tableSize = SafeStringToUint64(tableSizeString);
|
||||||
|
|
||||||
|
MemoryContextSwitchTo(oldContext);
|
||||||
|
MemoryContextReset(localContext);
|
||||||
|
|
||||||
PQclear(result);
|
PQclear(result);
|
||||||
ClearResults(connection, raiseErrors);
|
ClearResults(connection, raiseErrors);
|
||||||
|
|
||||||
if (tableSize <= 0)
|
if (tableSize <= 0)
|
||||||
{
|
{
|
||||||
PG_RETURN_FLOAT4(1);
|
PG_RETURN_FLOAT4(1);
|
||||||
|
@ -601,6 +612,12 @@ ExecutePlacementUpdates(List *placementUpdateList, Oid shardReplicationModeOid,
|
||||||
char *noticeOperation)
|
char *noticeOperation)
|
||||||
{
|
{
|
||||||
List *responsiveWorkerList = GetResponsiveWorkerList();
|
List *responsiveWorkerList = GetResponsiveWorkerList();
|
||||||
|
|
||||||
|
MemoryContext localContext = AllocSetContextCreate(CurrentMemoryContext,
|
||||||
|
"ExecutePlacementLoopContext",
|
||||||
|
ALLOCSET_DEFAULT_SIZES);
|
||||||
|
MemoryContext oldContext = MemoryContextSwitchTo(localContext);
|
||||||
|
|
||||||
ListCell *placementUpdateCell = NULL;
|
ListCell *placementUpdateCell = NULL;
|
||||||
|
|
||||||
char shardReplicationMode = LookupShardTransferMode(shardReplicationModeOid);
|
char shardReplicationMode = LookupShardTransferMode(shardReplicationModeOid);
|
||||||
|
@ -625,7 +642,9 @@ ExecutePlacementUpdates(List *placementUpdateList, Oid shardReplicationModeOid,
|
||||||
)));
|
)));
|
||||||
UpdateShardPlacement(placementUpdate, responsiveWorkerList,
|
UpdateShardPlacement(placementUpdate, responsiveWorkerList,
|
||||||
shardReplicationModeOid);
|
shardReplicationModeOid);
|
||||||
|
MemoryContextReset(localContext);
|
||||||
}
|
}
|
||||||
|
MemoryContextSwitchTo(oldContext);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue