Merge pull request #6086 from citusdata/use_less_mem_for_index_fix

Reduce memory consumption of index name fix for partitioned tables
pull/6066/head
Önder Kalacı 2022-07-27 10:01:08 +02:00 committed by GitHub
commit 9332a53088
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 61 additions and 40 deletions

View File

@ -161,13 +161,31 @@ GeneratePositiveIntSequenceList(int upTo)
/*
* StringJoin gets a list of char * and then simply
* returns a newly allocated char * joined with the
* given delimiter.
* given delimiter. It uses ';' as the delimiter by
* default.
*/
char *
StringJoin(List *stringList, char delimiter)
{
return StringJoinParams(stringList, delimiter, NULL, NULL);
}
/*
* StringJoin gets a list of char * and then simply
* returns a newly allocated char * joined with the
* given delimiter, prefix and postfix.
*/
char *
StringJoinParams(List *stringList, char delimiter, char *prefix, char *postfix)
{
StringInfo joinedString = makeStringInfo();
if (prefix != NULL)
{
appendStringInfoString(joinedString, prefix);
}
const char *command = NULL;
int curIndex = 0;
foreach_ptr(command, stringList)
@ -180,6 +198,11 @@ StringJoin(List *stringList, char delimiter)
curIndex++;
}
if (postfix != NULL)
{
appendStringInfoString(joinedString, postfix);
}
return joinedString->data;
}

View File

@ -53,7 +53,7 @@ static Relation try_relation_open_nolock(Oid relationId);
static List * CreateFixPartitionConstraintsTaskList(Oid relationId);
static List * WorkerFixPartitionConstraintCommandList(Oid relationId, uint64 shardId,
List *checkConstraintList);
static List * CreateFixPartitionShardIndexNamesTaskList(Oid parentRelationId,
static void CreateFixPartitionShardIndexNames(Oid parentRelationId,
Oid partitionRelationId,
Oid parentIndexOid);
static List * WorkerFixPartitionShardIndexNamesCommandList(uint64 parentShardId,
@ -205,6 +205,13 @@ fix_partition_shard_index_names(PG_FUNCTION_ARGS)
FixPartitionShardIndexNames(relationId, parentIndexOid);
/*
* This UDF is called from fix_all_partition_shard_index_names() which iterates
* over all the partitioned tables. There is no need to hold all the distributed
* table metadata until the end of the transaction for the input table.
*/
CitusTableCacheFlushInvalidatedEntries();
PG_RETURN_VOID();
}
@ -329,18 +336,10 @@ FixPartitionShardIndexNames(Oid relationId, Oid parentIndexOid)
RelationGetRelationName(relation))));
}
List *taskList =
CreateFixPartitionShardIndexNamesTaskList(parentRelationId,
CreateFixPartitionShardIndexNames(parentRelationId,
partitionRelationId,
parentIndexOid);
/* do not do anything if there are no index names to fix */
if (taskList != NIL)
{
bool localExecutionSupported = true;
ExecuteUtilityTaskList(taskList, localExecutionSupported);
}
relation_close(relation, NoLock);
}
@ -494,15 +493,15 @@ WorkerFixPartitionConstraintCommandList(Oid relationId, uint64 shardId,
* partition each task will have parent_indexes_count query strings. When we need
* to fix a single index, parent_indexes_count becomes 1.
*/
static List *
CreateFixPartitionShardIndexNamesTaskList(Oid parentRelationId, Oid partitionRelationId,
static void
CreateFixPartitionShardIndexNames(Oid parentRelationId, Oid partitionRelationId,
Oid parentIndexOid)
{
List *partitionList = PartitionList(parentRelationId);
if (partitionList == NIL)
{
/* early exit if the parent relation does not have any partitions */
return NIL;
return;
}
Relation parentRelation = RelationIdGetRelation(parentRelationId);
@ -526,7 +525,7 @@ CreateFixPartitionShardIndexNamesTaskList(Oid parentRelationId, Oid partitionRel
{
/* early exit if the parent relation does not have any indexes */
RelationClose(parentRelation);
return NIL;
return;
}
/*
@ -554,8 +553,12 @@ CreateFixPartitionShardIndexNamesTaskList(Oid parentRelationId, Oid partitionRel
/* lock metadata before getting placement lists */
LockShardListMetadata(parentShardIntervalList, ShareLock);
MemoryContext localContext = AllocSetContextCreate(CurrentMemoryContext,
"CreateFixPartitionShardIndexNames",
ALLOCSET_DEFAULT_SIZES);
MemoryContext oldContext = MemoryContextSwitchTo(localContext);
int taskId = 1;
List *taskList = NIL;
ShardInterval *parentShardInterval = NULL;
foreach_ptr(parentShardInterval, parentShardIntervalList)
@ -566,43 +569,36 @@ CreateFixPartitionShardIndexNamesTaskList(Oid parentRelationId, Oid partitionRel
WorkerFixPartitionShardIndexNamesCommandList(parentShardId,
parentIndexIdList,
partitionRelationId);
if (queryStringList != NIL)
{
Task *task = CitusMakeNode(Task);
task->jobId = INVALID_JOB_ID;
task->taskId = taskId++;
task->taskType = DDL_TASK;
/*
* There could be O(#partitions * #indexes) queries in
* the queryStringList.
*
* In order to avoid round-trips per query in queryStringList,
* we join the string and send as a single command via the UDF.
* Otherwise, the executor sends each command with one
* round-trip.
*/
char *string = StringJoin(queryStringList, ';');
StringInfo commandToRun = makeStringInfo();
char *prefix = "SELECT pg_catalog.citus_run_local_command($$";
char *postfix = "$$)";
char *string = StringJoinParams(queryStringList, ';', prefix, postfix);
SetTaskQueryString(task, string);
appendStringInfo(commandToRun,
"SELECT pg_catalog.citus_run_local_command($$%s$$)", string);
SetTaskQueryString(task, commandToRun->data);
task->dependentTaskList = NULL;
task->replicationModel = REPLICATION_MODEL_INVALID;
task->anchorShardId = parentShardId;
task->taskPlacementList = ActiveShardPlacementList(parentShardId);
taskList = lappend(taskList, task);
bool localExecutionSupported = true;
ExecuteUtilityTaskList(list_make1(task), localExecutionSupported);
}
/* after every iteration, clean-up all the memory associated with it */
MemoryContextReset(localContext);
}
MemoryContextSwitchTo(oldContext);
RelationClose(parentRelation);
return taskList;
}

View File

@ -166,6 +166,8 @@ extern List * SortList(List *pointerList,
extern void ** PointerArrayFromList(List *pointerList);
extern HTAB * ListToHashSet(List *pointerList, Size keySize, bool isStringList);
extern char * StringJoin(List *stringList, char delimiter);
extern char * StringJoinParams(List *stringList, char delimiter,
char *prefix, char *postfix);
extern List * ListTake(List *pointerList, int size);
extern void * safe_list_nth(const List *list, int index);
extern List * GeneratePositiveIntSequenceList(int upTo);