diff --git a/src/backend/distributed/operations/repair_shards.c b/src/backend/distributed/operations/repair_shards.c index ee00c6c54..be2313f0e 100644 --- a/src/backend/distributed/operations/repair_shards.c +++ b/src/backend/distributed/operations/repair_shards.c @@ -700,6 +700,11 @@ CopyShardTables(List *shardIntervalList, char *sourceNodeName, int32 sourceNodeP { ShardInterval *shardInterval = NULL; + MemoryContext localContext = AllocSetContextCreate(CurrentMemoryContext, + "CopyShardTables", + ALLOCSET_DEFAULT_SIZES); + MemoryContext oldContext = MemoryContextSwitchTo(localContext); + /* iterate through the colocated shards and copy each */ foreach_ptr(shardInterval, shardIntervalList) { @@ -719,6 +724,8 @@ CopyShardTables(List *shardIntervalList, char *sourceNodeName, int32 sourceNodeP tableOwner, ddlCommandList); } + MemoryContextReset(localContext); + /* * Once all shards are created, we can recreate relationships between shards. @@ -750,7 +757,10 @@ CopyShardTables(List *shardIntervalList, char *sourceNodeName, int32 sourceNodeP SendCommandListToWorkerInSingleTransaction(targetNodeName, targetNodePort, tableOwner, commandList); + MemoryContextReset(localContext); } + + MemoryContextSwitchTo(oldContext); } diff --git a/src/backend/distributed/operations/shard_rebalancer.c b/src/backend/distributed/operations/shard_rebalancer.c index df35bc861..b0ced956b 100644 --- a/src/backend/distributed/operations/shard_rebalancer.c +++ b/src/backend/distributed/operations/shard_rebalancer.c @@ -452,6 +452,11 @@ citus_shard_cost_by_disk_size(PG_FUNCTION_ARGS) /* we skip child tables of a partitioned table if this boolean variable is true */ bool optimizePartitionCalculations = true; + + MemoryContext localContext = AllocSetContextCreate(CurrentMemoryContext, + "CostByDiscSizeContext", + ALLOCSET_DEFAULT_SIZES); + MemoryContext oldContext = MemoryContextSwitchTo(localContext); ShardInterval *shardInterval = LoadShardInterval(shardId); List *colocatedShardList = ColocatedNonPartitionShardIntervalList(shardInterval); StringInfo tableSizeQuery = GenerateSizeQueryOnMultiplePlacements(colocatedShardList, @@ -465,6 +470,7 @@ citus_shard_cost_by_disk_size(PG_FUNCTION_ARGS) if (queryResult != RESPONSE_OKAY) { + MemoryContextSwitchTo(oldContext); ereport(ERROR, (errcode(ERRCODE_CONNECTION_FAILURE), errmsg("cannot get the size because of a connection error"))); } @@ -477,12 +483,17 @@ citus_shard_cost_by_disk_size(PG_FUNCTION_ARGS) list_length(sizeList)))); } + StringInfo tableSizeStringInfo = (StringInfo) linitial(sizeList); char *tableSizeString = tableSizeStringInfo->data; uint64 tableSize = SafeStringToUint64(tableSizeString); + MemoryContextSwitchTo(oldContext); + MemoryContextReset(localContext); + PQclear(result); ClearResults(connection, raiseErrors); + if (tableSize <= 0) { PG_RETURN_FLOAT4(1); @@ -601,6 +612,12 @@ ExecutePlacementUpdates(List *placementUpdateList, Oid shardReplicationModeOid, char *noticeOperation) { List *responsiveWorkerList = GetResponsiveWorkerList(); + + MemoryContext localContext = AllocSetContextCreate(CurrentMemoryContext, + "ExecutePlacementLoopContext", + ALLOCSET_DEFAULT_SIZES); + MemoryContext oldContext = MemoryContextSwitchTo(localContext); + ListCell *placementUpdateCell = NULL; char shardReplicationMode = LookupShardTransferMode(shardReplicationModeOid); @@ -625,7 +642,9 @@ ExecutePlacementUpdates(List *placementUpdateList, Oid shardReplicationModeOid, ))); UpdateShardPlacement(placementUpdate, responsiveWorkerList, shardReplicationModeOid); + MemoryContextReset(localContext); } + MemoryContextSwitchTo(oldContext); }