fix memory leak during altering distributed table with a lot of partition and shards (#6726)

2 improvements to prevent memory leaks during altering or undistributing
distributed tables with a lot of partitions and shards:

1. Free memory for each call to ConvertTable so that colocated and partition tables at
`AlterDistributedTable`, `UndistributeTable`, or
`AlterTableSetAccessMethod` will not cause an increase
in memory usage,
2. Free memory while executing attach partition commands for each partition table at
`AlterDistributedTable` to prevent an increase in memory usage.

DESCRIPTION: Fixes memory leak issue during altering distributed table
with a lot of partition and shards.

Fixes https://github.com/citusdata/citus/issues/6503.
pull/6740/head^2
aykut-bozkurt 2023-02-28 21:23:41 +03:00 committed by GitHub
parent 17ad61678f
commit e2654deeae
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 69 additions and 4 deletions

View File

@ -183,6 +183,7 @@ static TableConversionReturn * AlterDistributedTable(TableConversionParameters *
static TableConversionReturn * AlterTableSetAccessMethod( static TableConversionReturn * AlterTableSetAccessMethod(
TableConversionParameters *params); TableConversionParameters *params);
static TableConversionReturn * ConvertTable(TableConversionState *con); static TableConversionReturn * ConvertTable(TableConversionState *con);
static TableConversionReturn * ConvertTableInternal(TableConversionState *con);
static bool SwitchToSequentialAndLocalExecutionIfShardNameTooLong(char *relationName, static bool SwitchToSequentialAndLocalExecutionIfShardNameTooLong(char *relationName,
char *longestShardName); char *longestShardName);
static void DropIndexesNotSupportedByColumnar(Oid relationId, static void DropIndexesNotSupportedByColumnar(Oid relationId,
@ -216,6 +217,8 @@ static bool WillRecreateForeignKeyToReferenceTable(Oid relationId,
static void WarningsForDroppingForeignKeysWithDistributedTables(Oid relationId); static void WarningsForDroppingForeignKeysWithDistributedTables(Oid relationId);
static void ErrorIfUnsupportedCascadeObjects(Oid relationId); static void ErrorIfUnsupportedCascadeObjects(Oid relationId);
static bool DoesCascadeDropUnsupportedObject(Oid classId, Oid id, HTAB *nodeMap); static bool DoesCascadeDropUnsupportedObject(Oid classId, Oid id, HTAB *nodeMap);
static TableConversionReturn * CopyTableConversionReturnIntoCurrentContext(
TableConversionReturn *tableConversionReturn);
PG_FUNCTION_INFO_V1(undistribute_table); PG_FUNCTION_INFO_V1(undistribute_table);
PG_FUNCTION_INFO_V1(alter_distributed_table); PG_FUNCTION_INFO_V1(alter_distributed_table);
@ -402,6 +405,7 @@ UndistributeTable(TableConversionParameters *params)
params->conversionType = UNDISTRIBUTE_TABLE; params->conversionType = UNDISTRIBUTE_TABLE;
params->shardCountIsNull = true; params->shardCountIsNull = true;
TableConversionState *con = CreateTableConversion(params); TableConversionState *con = CreateTableConversion(params);
return ConvertTable(con); return ConvertTable(con);
} }
@ -441,6 +445,7 @@ AlterDistributedTable(TableConversionParameters *params)
ereport(DEBUG1, (errmsg("setting multi shard modify mode to sequential"))); ereport(DEBUG1, (errmsg("setting multi shard modify mode to sequential")));
SetLocalMultiShardModifyModeToSequential(); SetLocalMultiShardModifyModeToSequential();
} }
return ConvertTable(con); return ConvertTable(con);
} }
@ -511,9 +516,9 @@ AlterTableSetAccessMethod(TableConversionParameters *params)
/* /*
* ConvertTable is used for converting a table into a new table with different properties. * ConvertTableInternal is used for converting a table into a new table with different
* The conversion is done by creating a new table, moving everything to the new table and * properties. The conversion is done by creating a new table, moving everything to the
* dropping the old one. So the oid of the table is not preserved. * new table and dropping the old one. So the oid of the table is not preserved.
* *
* The new table will have the same name, columns and rows. It will also have partitions, * The new table will have the same name, columns and rows. It will also have partitions,
* views, sequences of the old table. Finally it will have everything created by * views, sequences of the old table. Finally it will have everything created by
@ -532,7 +537,7 @@ AlterTableSetAccessMethod(TableConversionParameters *params)
* in case you add a new way to return from this function. * in case you add a new way to return from this function.
*/ */
TableConversionReturn * TableConversionReturn *
ConvertTable(TableConversionState *con) ConvertTableInternal(TableConversionState *con)
{ {
InTableTypeConversionFunctionCall = true; InTableTypeConversionFunctionCall = true;
@ -884,10 +889,70 @@ ConvertTable(TableConversionState *con)
SetLocalEnableLocalReferenceForeignKeys(oldEnableLocalReferenceForeignKeys); SetLocalEnableLocalReferenceForeignKeys(oldEnableLocalReferenceForeignKeys);
InTableTypeConversionFunctionCall = false; InTableTypeConversionFunctionCall = false;
return ret; return ret;
} }
/*
* CopyTableConversionReturnIntoCurrentContext copies given tableConversionReturn
* into CurrentMemoryContext.
*/
static TableConversionReturn *
CopyTableConversionReturnIntoCurrentContext(TableConversionReturn *tableConversionReturn)
{
TableConversionReturn *tableConversionReturnCopy = NULL;
if (tableConversionReturn)
{
tableConversionReturnCopy = palloc0(sizeof(TableConversionReturn));
List *copyForeignKeyCommands = NIL;
char *foreignKeyCommand = NULL;
foreach_ptr(foreignKeyCommand, tableConversionReturn->foreignKeyCommands)
{
char *copyForeignKeyCommand = MemoryContextStrdup(CurrentMemoryContext,
foreignKeyCommand);
copyForeignKeyCommands = lappend(copyForeignKeyCommands,
copyForeignKeyCommand);
}
tableConversionReturnCopy->foreignKeyCommands = copyForeignKeyCommands;
}
return tableConversionReturnCopy;
}
/*
* ConvertTable is a wrapper for ConvertTableInternal to persist only
* TableConversionReturn and delete all other allocations.
*/
static TableConversionReturn *
ConvertTable(TableConversionState *con)
{
/*
* when there are many partitions or colocated tables, memory usage is
* accumulated. Free context for each call to ConvertTable.
*/
MemoryContext convertTableContext =
AllocSetContextCreate(CurrentMemoryContext,
"citus_convert_table_context",
ALLOCSET_DEFAULT_SIZES);
MemoryContext oldContext = MemoryContextSwitchTo(convertTableContext);
TableConversionReturn *tableConversionReturn = ConvertTableInternal(con);
MemoryContextSwitchTo(oldContext);
/* persist TableConversionReturn in oldContext */
TableConversionReturn *tableConversionReturnCopy =
CopyTableConversionReturnIntoCurrentContext(tableConversionReturn);
/* delete convertTableContext */
MemoryContextDelete(convertTableContext);
return tableConversionReturnCopy;
}
/* /*
* DropIndexesNotSupportedByColumnar is a helper function used during accces * DropIndexesNotSupportedByColumnar is a helper function used during accces
* method conversion to drop the indexes that are not supported by columnarAM. * method conversion to drop the indexes that are not supported by columnarAM.