mirror of https://github.com/citusdata/citus.git
Reduce memory consumption while adjust partition index names
Previously, CreateFixPartitionShardIndexNames() created all
the relevant query strings for all the shards, and executed
the large query string. And, in terms of the memory consumption,
this huge command (and its ExprContext generated while running
the command) is the main bottleneck/
With this change, we are reducing the total amount of memory
usage to almost 1/shard_count.
On my local machine, a distributed partitioned table with 120 partitions,
each 32 shards, the total memory consumption reduced from ~3GB
to ~0.1GB. And, the total execution time increased from ~28 seconds
to ~30 seconds. This seems like a good trade-off.
(cherry picked from commit b8008999dc
)
release-11-onder-27-july
parent
fcdf4434c6
commit
377375de2a
|
@ -53,7 +53,7 @@ static Relation try_relation_open_nolock(Oid relationId);
|
||||||
static List * CreateFixPartitionConstraintsTaskList(Oid relationId);
|
static List * CreateFixPartitionConstraintsTaskList(Oid relationId);
|
||||||
static List * WorkerFixPartitionConstraintCommandList(Oid relationId, uint64 shardId,
|
static List * WorkerFixPartitionConstraintCommandList(Oid relationId, uint64 shardId,
|
||||||
List *checkConstraintList);
|
List *checkConstraintList);
|
||||||
static List * CreateFixPartitionShardIndexNamesTaskList(Oid parentRelationId,
|
static void CreateFixPartitionShardIndexNames(Oid parentRelationId,
|
||||||
Oid partitionRelationId,
|
Oid partitionRelationId,
|
||||||
Oid parentIndexOid);
|
Oid parentIndexOid);
|
||||||
static List * WorkerFixPartitionShardIndexNamesCommandList(uint64 parentShardId,
|
static List * WorkerFixPartitionShardIndexNamesCommandList(uint64 parentShardId,
|
||||||
|
@ -329,18 +329,10 @@ FixPartitionShardIndexNames(Oid relationId, Oid parentIndexOid)
|
||||||
RelationGetRelationName(relation))));
|
RelationGetRelationName(relation))));
|
||||||
}
|
}
|
||||||
|
|
||||||
List *taskList =
|
CreateFixPartitionShardIndexNames(parentRelationId,
|
||||||
CreateFixPartitionShardIndexNamesTaskList(parentRelationId,
|
|
||||||
partitionRelationId,
|
partitionRelationId,
|
||||||
parentIndexOid);
|
parentIndexOid);
|
||||||
|
|
||||||
/* do not do anything if there are no index names to fix */
|
|
||||||
if (taskList != NIL)
|
|
||||||
{
|
|
||||||
bool localExecutionSupported = true;
|
|
||||||
ExecuteUtilityTaskList(taskList, localExecutionSupported);
|
|
||||||
}
|
|
||||||
|
|
||||||
relation_close(relation, NoLock);
|
relation_close(relation, NoLock);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -494,15 +486,15 @@ WorkerFixPartitionConstraintCommandList(Oid relationId, uint64 shardId,
|
||||||
* partition each task will have parent_indexes_count query strings. When we need
|
* partition each task will have parent_indexes_count query strings. When we need
|
||||||
* to fix a single index, parent_indexes_count becomes 1.
|
* to fix a single index, parent_indexes_count becomes 1.
|
||||||
*/
|
*/
|
||||||
static List *
|
static void
|
||||||
CreateFixPartitionShardIndexNamesTaskList(Oid parentRelationId, Oid partitionRelationId,
|
CreateFixPartitionShardIndexNames(Oid parentRelationId, Oid partitionRelationId,
|
||||||
Oid parentIndexOid)
|
Oid parentIndexOid)
|
||||||
{
|
{
|
||||||
List *partitionList = PartitionList(parentRelationId);
|
List *partitionList = PartitionList(parentRelationId);
|
||||||
if (partitionList == NIL)
|
if (partitionList == NIL)
|
||||||
{
|
{
|
||||||
/* early exit if the parent relation does not have any partitions */
|
/* early exit if the parent relation does not have any partitions */
|
||||||
return NIL;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
Relation parentRelation = RelationIdGetRelation(parentRelationId);
|
Relation parentRelation = RelationIdGetRelation(parentRelationId);
|
||||||
|
@ -521,7 +513,7 @@ CreateFixPartitionShardIndexNamesTaskList(Oid parentRelationId, Oid partitionRel
|
||||||
{
|
{
|
||||||
/* early exit if the parent relation does not have any indexes */
|
/* early exit if the parent relation does not have any indexes */
|
||||||
RelationClose(parentRelation);
|
RelationClose(parentRelation);
|
||||||
return NIL;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
@ -549,8 +541,12 @@ CreateFixPartitionShardIndexNamesTaskList(Oid parentRelationId, Oid partitionRel
|
||||||
/* lock metadata before getting placement lists */
|
/* lock metadata before getting placement lists */
|
||||||
LockShardListMetadata(parentShardIntervalList, ShareLock);
|
LockShardListMetadata(parentShardIntervalList, ShareLock);
|
||||||
|
|
||||||
|
MemoryContext localContext = AllocSetContextCreate(CurrentMemoryContext,
|
||||||
|
"CreateFixPartitionShardIndexNames",
|
||||||
|
ALLOCSET_DEFAULT_SIZES);
|
||||||
|
MemoryContext oldContext = MemoryContextSwitchTo(localContext);
|
||||||
|
|
||||||
int taskId = 1;
|
int taskId = 1;
|
||||||
List *taskList = NIL;
|
|
||||||
|
|
||||||
ShardInterval *parentShardInterval = NULL;
|
ShardInterval *parentShardInterval = NULL;
|
||||||
foreach_ptr(parentShardInterval, parentShardIntervalList)
|
foreach_ptr(parentShardInterval, parentShardIntervalList)
|
||||||
|
@ -561,24 +557,14 @@ CreateFixPartitionShardIndexNamesTaskList(Oid parentRelationId, Oid partitionRel
|
||||||
WorkerFixPartitionShardIndexNamesCommandList(parentShardId,
|
WorkerFixPartitionShardIndexNamesCommandList(parentShardId,
|
||||||
parentIndexIdList,
|
parentIndexIdList,
|
||||||
partitionRelationId);
|
partitionRelationId);
|
||||||
|
|
||||||
if (queryStringList != NIL)
|
if (queryStringList != NIL)
|
||||||
{
|
{
|
||||||
Task *task = CitusMakeNode(Task);
|
Task *task = CitusMakeNode(Task);
|
||||||
task->jobId = INVALID_JOB_ID;
|
task->jobId = INVALID_JOB_ID;
|
||||||
task->taskId = taskId++;
|
task->taskId = taskId++;
|
||||||
|
|
||||||
task->taskType = DDL_TASK;
|
task->taskType = DDL_TASK;
|
||||||
|
|
||||||
/*
|
|
||||||
* There could be O(#partitions * #indexes) queries in
|
|
||||||
* the queryStringList.
|
|
||||||
*
|
|
||||||
* In order to avoid round-trips per query in queryStringList,
|
|
||||||
* we join the string and send as a single command via the UDF.
|
|
||||||
* Otherwise, the executor sends each command with one
|
|
||||||
* round-trip.
|
|
||||||
*/
|
|
||||||
char *string = StringJoin(queryStringList, ';');
|
char *string = StringJoin(queryStringList, ';');
|
||||||
StringInfo commandToRun = makeStringInfo();
|
StringInfo commandToRun = makeStringInfo();
|
||||||
|
|
||||||
|
@ -586,18 +572,23 @@ CreateFixPartitionShardIndexNamesTaskList(Oid parentRelationId, Oid partitionRel
|
||||||
"SELECT pg_catalog.citus_run_local_command($$%s$$)", string);
|
"SELECT pg_catalog.citus_run_local_command($$%s$$)", string);
|
||||||
SetTaskQueryString(task, commandToRun->data);
|
SetTaskQueryString(task, commandToRun->data);
|
||||||
|
|
||||||
|
|
||||||
task->dependentTaskList = NULL;
|
task->dependentTaskList = NULL;
|
||||||
task->replicationModel = REPLICATION_MODEL_INVALID;
|
task->replicationModel = REPLICATION_MODEL_INVALID;
|
||||||
task->anchorShardId = parentShardId;
|
task->anchorShardId = parentShardId;
|
||||||
task->taskPlacementList = ActiveShardPlacementList(parentShardId);
|
task->taskPlacementList = ActiveShardPlacementList(parentShardId);
|
||||||
|
|
||||||
taskList = lappend(taskList, task);
|
bool localExecutionSupported = true;
|
||||||
|
ExecuteUtilityTaskList(list_make1(task), localExecutionSupported);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* after every iteration, clean-up all the memory associated with it */
|
||||||
|
MemoryContextReset(localContext);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
MemoryContextSwitchTo(oldContext);
|
||||||
|
|
||||||
RelationClose(parentRelation);
|
RelationClose(parentRelation);
|
||||||
|
|
||||||
return taskList;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue