From b8008999dc137f90b3c1565141653b2a42f25f0d Mon Sep 17 00:00:00 2001 From: Onder Kalaci Date: Tue, 26 Jul 2022 12:09:12 +0200 Subject: [PATCH 1/3] Reduce memory consumption while adjust partition index names Previously, CreateFixPartitionShardIndexNames() created all the relevant query strings for all the shards, and executed the large query string. And, in terms of the memory consumption, this huge command (and its ExprContext generated while running the command) is the main bottleneck/ With this change, we are reducing the total amount of memory usage to almost 1/shard_count. On my local machine, a distributed partitioned table with 120 partitions, each 32 shards, the total memory consumption reduced from ~3GB to ~0.1GB. And, the total execution time increased from ~28 seconds to ~30 seconds. This seems like a good trade-off. --- .../utils/multi_partitioning_utils.c | 59 ++++++++----------- 1 file changed, 25 insertions(+), 34 deletions(-) diff --git a/src/backend/distributed/utils/multi_partitioning_utils.c b/src/backend/distributed/utils/multi_partitioning_utils.c index 8a046dcd7..59876e34d 100644 --- a/src/backend/distributed/utils/multi_partitioning_utils.c +++ b/src/backend/distributed/utils/multi_partitioning_utils.c @@ -53,9 +53,9 @@ static Relation try_relation_open_nolock(Oid relationId); static List * CreateFixPartitionConstraintsTaskList(Oid relationId); static List * WorkerFixPartitionConstraintCommandList(Oid relationId, uint64 shardId, List *checkConstraintList); -static List * CreateFixPartitionShardIndexNamesTaskList(Oid parentRelationId, - Oid partitionRelationId, - Oid parentIndexOid); +static void CreateFixPartitionShardIndexNames(Oid parentRelationId, + Oid partitionRelationId, + Oid parentIndexOid); static List * WorkerFixPartitionShardIndexNamesCommandList(uint64 parentShardId, List *indexIdList, Oid partitionRelationId); @@ -329,17 +329,9 @@ FixPartitionShardIndexNames(Oid relationId, Oid parentIndexOid) RelationGetRelationName(relation)))); } - List *taskList = - CreateFixPartitionShardIndexNamesTaskList(parentRelationId, - partitionRelationId, - parentIndexOid); - - /* do not do anything if there are no index names to fix */ - if (taskList != NIL) - { - bool localExecutionSupported = true; - ExecuteUtilityTaskList(taskList, localExecutionSupported); - } + CreateFixPartitionShardIndexNames(parentRelationId, + partitionRelationId, + parentIndexOid); relation_close(relation, NoLock); } @@ -494,15 +486,15 @@ WorkerFixPartitionConstraintCommandList(Oid relationId, uint64 shardId, * partition each task will have parent_indexes_count query strings. When we need * to fix a single index, parent_indexes_count becomes 1. */ -static List * -CreateFixPartitionShardIndexNamesTaskList(Oid parentRelationId, Oid partitionRelationId, - Oid parentIndexOid) +static void +CreateFixPartitionShardIndexNames(Oid parentRelationId, Oid partitionRelationId, + Oid parentIndexOid) { List *partitionList = PartitionList(parentRelationId); if (partitionList == NIL) { /* early exit if the parent relation does not have any partitions */ - return NIL; + return; } Relation parentRelation = RelationIdGetRelation(parentRelationId); @@ -526,7 +518,7 @@ CreateFixPartitionShardIndexNamesTaskList(Oid parentRelationId, Oid partitionRel { /* early exit if the parent relation does not have any indexes */ RelationClose(parentRelation); - return NIL; + return; } /* @@ -554,8 +546,12 @@ CreateFixPartitionShardIndexNamesTaskList(Oid parentRelationId, Oid partitionRel /* lock metadata before getting placement lists */ LockShardListMetadata(parentShardIntervalList, ShareLock); + MemoryContext localContext = AllocSetContextCreate(CurrentMemoryContext, + "CreateFixPartitionShardIndexNames", + ALLOCSET_DEFAULT_SIZES); + MemoryContext oldContext = MemoryContextSwitchTo(localContext); + int taskId = 1; - List *taskList = NIL; ShardInterval *parentShardInterval = NULL; foreach_ptr(parentShardInterval, parentShardIntervalList) @@ -566,24 +562,14 @@ CreateFixPartitionShardIndexNamesTaskList(Oid parentRelationId, Oid partitionRel WorkerFixPartitionShardIndexNamesCommandList(parentShardId, parentIndexIdList, partitionRelationId); - if (queryStringList != NIL) { Task *task = CitusMakeNode(Task); task->jobId = INVALID_JOB_ID; task->taskId = taskId++; - task->taskType = DDL_TASK; - /* - * There could be O(#partitions * #indexes) queries in - * the queryStringList. - * - * In order to avoid round-trips per query in queryStringList, - * we join the string and send as a single command via the UDF. - * Otherwise, the executor sends each command with one - * round-trip. - */ + char *string = StringJoin(queryStringList, ';'); StringInfo commandToRun = makeStringInfo(); @@ -591,18 +577,23 @@ CreateFixPartitionShardIndexNamesTaskList(Oid parentRelationId, Oid partitionRel "SELECT pg_catalog.citus_run_local_command($$%s$$)", string); SetTaskQueryString(task, commandToRun->data); + task->dependentTaskList = NULL; task->replicationModel = REPLICATION_MODEL_INVALID; task->anchorShardId = parentShardId; task->taskPlacementList = ActiveShardPlacementList(parentShardId); - taskList = lappend(taskList, task); + bool localExecutionSupported = true; + ExecuteUtilityTaskList(list_make1(task), localExecutionSupported); } + + /* after every iteration, clean-up all the memory associated with it */ + MemoryContextReset(localContext); } - RelationClose(parentRelation); + MemoryContextSwitchTo(oldContext); - return taskList; + RelationClose(parentRelation); } From 26fdcb68f007015797bab3070e2d2404bb42bc5c Mon Sep 17 00:00:00 2001 From: Onder Kalaci Date: Tue, 26 Jul 2022 13:44:42 +0200 Subject: [PATCH 2/3] Optimize StringJoin() for when prefix-postfix is needed Before this commit, we required multiple copies of the same stringInfo if we needed to append/prepend data to the stringInfo. Now, we optionally get prefix/postfix. For large string operations, this can save up to %10 memory. --- src/backend/distributed/utils/listutils.c | 25 ++++++++++++++++++- .../utils/multi_partitioning_utils.c | 10 +++----- src/include/distributed/listutils.h | 2 ++ 3 files changed, 30 insertions(+), 7 deletions(-) diff --git a/src/backend/distributed/utils/listutils.c b/src/backend/distributed/utils/listutils.c index ce2920748..3279193ef 100644 --- a/src/backend/distributed/utils/listutils.c +++ b/src/backend/distributed/utils/listutils.c @@ -161,13 +161,31 @@ GeneratePositiveIntSequenceList(int upTo) /* * StringJoin gets a list of char * and then simply * returns a newly allocated char * joined with the - * given delimiter. + * given delimiter. It uses ';' as the delimiter by + * default. */ char * StringJoin(List *stringList, char delimiter) +{ + return StringJoinParams(stringList, delimiter, NULL, NULL); +} + + +/* + * StringJoin gets a list of char * and then simply + * returns a newly allocated char * joined with the + * given delimiter, prefix and postfix. + */ +char * +StringJoinParams(List *stringList, char delimiter, char *prefix, char *postfix) { StringInfo joinedString = makeStringInfo(); + if (prefix != NULL) + { + appendStringInfoString(joinedString, prefix); + } + const char *command = NULL; int curIndex = 0; foreach_ptr(command, stringList) @@ -180,6 +198,11 @@ StringJoin(List *stringList, char delimiter) curIndex++; } + if (postfix != NULL) + { + appendStringInfoString(joinedString, postfix); + } + return joinedString->data; } diff --git a/src/backend/distributed/utils/multi_partitioning_utils.c b/src/backend/distributed/utils/multi_partitioning_utils.c index 59876e34d..9dfa285a2 100644 --- a/src/backend/distributed/utils/multi_partitioning_utils.c +++ b/src/backend/distributed/utils/multi_partitioning_utils.c @@ -569,13 +569,11 @@ CreateFixPartitionShardIndexNames(Oid parentRelationId, Oid partitionRelationId, task->taskId = taskId++; task->taskType = DDL_TASK; + char *prefix = "SELECT pg_catalog.citus_run_local_command($$"; + char *postfix = "$$)"; + char *string = StringJoinParams(queryStringList, ';', prefix, postfix); - char *string = StringJoin(queryStringList, ';'); - StringInfo commandToRun = makeStringInfo(); - - appendStringInfo(commandToRun, - "SELECT pg_catalog.citus_run_local_command($$%s$$)", string); - SetTaskQueryString(task, commandToRun->data); + SetTaskQueryString(task, string); task->dependentTaskList = NULL; diff --git a/src/include/distributed/listutils.h b/src/include/distributed/listutils.h index c3facf76f..aa6a0e96b 100644 --- a/src/include/distributed/listutils.h +++ b/src/include/distributed/listutils.h @@ -166,6 +166,8 @@ extern List * SortList(List *pointerList, extern void ** PointerArrayFromList(List *pointerList); extern HTAB * ListToHashSet(List *pointerList, Size keySize, bool isStringList); extern char * StringJoin(List *stringList, char delimiter); +extern char * StringJoinParams(List *stringList, char delimiter, + char *prefix, char *postfix); extern List * ListTake(List *pointerList, int size); extern void * safe_list_nth(const List *list, int index); extern List * GeneratePositiveIntSequenceList(int upTo); From f076e811664322ce5171f22ba3251a73c0b79052 Mon Sep 17 00:00:00 2001 From: Onder Kalaci Date: Tue, 26 Jul 2022 15:15:35 +0200 Subject: [PATCH 3/3] Do not cache all the metadata during fix_all_partition_shard_index_names --- src/backend/distributed/utils/multi_partitioning_utils.c | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/src/backend/distributed/utils/multi_partitioning_utils.c b/src/backend/distributed/utils/multi_partitioning_utils.c index 9dfa285a2..3ec02da48 100644 --- a/src/backend/distributed/utils/multi_partitioning_utils.c +++ b/src/backend/distributed/utils/multi_partitioning_utils.c @@ -205,6 +205,13 @@ fix_partition_shard_index_names(PG_FUNCTION_ARGS) FixPartitionShardIndexNames(relationId, parentIndexOid); + /* + * This UDF is called from fix_all_partition_shard_index_names() which iterates + * over all the partitioned tables. There is no need to hold all the distributed + * table metadata until the end of the transaction for the input table. + */ + CitusTableCacheFlushInvalidatedEntries(); + PG_RETURN_VOID(); }