From 40b52ab757cbe7c2354b57c9c64719bcaf6ecaf2 Mon Sep 17 00:00:00 2001 From: Hadi Moshayedi Date: Tue, 17 Nov 2020 23:06:32 -0800 Subject: [PATCH] Fix memory leaks in column store --- src/backend/columnar/cstore_debug.c | 99 ++++++++++++++++ src/backend/columnar/cstore_tableam.c | 14 +++ src/backend/columnar/cstore_writer.c | 8 +- src/backend/columnar/write_state_management.c | 19 +++- src/include/columnar/cstore.h | 2 + src/test/regress/columnar_am_schedule | 1 + src/test/regress/expected/am_memory.out | 106 ++++++++++++++++++ src/test/regress/sql/am_memory.sql | 102 +++++++++++++++++ 8 files changed, 348 insertions(+), 3 deletions(-) create mode 100644 src/backend/columnar/cstore_debug.c create mode 100644 src/test/regress/expected/am_memory.out create mode 100644 src/test/regress/sql/am_memory.sql diff --git a/src/backend/columnar/cstore_debug.c b/src/backend/columnar/cstore_debug.c new file mode 100644 index 000000000..e6e289e4f --- /dev/null +++ b/src/backend/columnar/cstore_debug.c @@ -0,0 +1,99 @@ +/*------------------------------------------------------------------------- + * + * cstore_debug.c + * + * Helper functions to debug column store. + * + *------------------------------------------------------------------------- + */ + + +#include "postgres.h" + +#include "pg_config.h" +#include "access/nbtree.h" +#include "catalog/pg_am.h" +#include "catalog/pg_type.h" +#include "distributed/pg_version_constants.h" +#include "distributed/tuplestore.h" +#include "miscadmin.h" +#include "storage/fd.h" +#include "storage/smgr.h" +#include "utils/guc.h" +#include "utils/memutils.h" +#include "utils/rel.h" +#include "utils/tuplestore.h" + +#include "columnar/cstore.h" +#include "columnar/cstore_version_compat.h" + +static void MemoryContextTotals(MemoryContext context, MemoryContextCounters *counters); + +PG_FUNCTION_INFO_V1(column_store_memory_stats); + + +/* + * column_store_memory_stats returns a record of 3 values: size of + * TopMemoryContext, TopTransactionContext, and Write State context. + */ +Datum +column_store_memory_stats(PG_FUNCTION_ARGS) +{ + TupleDesc tupleDescriptor = NULL; + const int resultColumnCount = 3; + +#if PG_VERSION_NUM >= PG_VERSION_12 + tupleDescriptor = CreateTemplateTupleDesc(resultColumnCount); +#else + tupleDescriptor = CreateTemplateTupleDesc(resultColumnCount, false); +#endif + + TupleDescInitEntry(tupleDescriptor, (AttrNumber) 1, "TopMemoryContext", + INT8OID, -1, 0); + TupleDescInitEntry(tupleDescriptor, (AttrNumber) 2, "TopTransactionContext", + INT8OID, -1, 0); + TupleDescInitEntry(tupleDescriptor, (AttrNumber) 3, "WriteStateContext", + INT8OID, -1, 0); + + MemoryContextCounters transactionCounters = { 0 }; + MemoryContextCounters topCounters = { 0 }; + MemoryContextCounters writeStateCounters = { 0 }; + MemoryContextTotals(TopTransactionContext, &transactionCounters); + MemoryContextTotals(TopMemoryContext, &topCounters); + MemoryContextTotals(GetWriteContextForDebug(), &writeStateCounters); + + bool nulls[3] = { false }; + Datum values[3] = { + Int64GetDatum(topCounters.totalspace), + Int64GetDatum(transactionCounters.totalspace), + Int64GetDatum(writeStateCounters.totalspace) + }; + + Tuplestorestate *tupleStore = SetupTuplestore(fcinfo, &tupleDescriptor); + tuplestore_putvalues(tupleStore, tupleDescriptor, values, nulls); + tuplestore_donestoring(tupleStore); + + PG_RETURN_DATUM(0); +} + + +/* + * MemoryContextTotals adds stats of the given memory context and its + * subtree to the given counters. + */ +static void +MemoryContextTotals(MemoryContext context, MemoryContextCounters *counters) +{ + if (context == NULL) + { + return; + } + + MemoryContext child; + for (child = context->firstchild; child != NULL; child = child->nextchild) + { + MemoryContextTotals(child, counters); + } + + context->methods->stats(context, NULL, NULL, counters); +} diff --git a/src/backend/columnar/cstore_tableam.c b/src/backend/columnar/cstore_tableam.c index 92a3f09e7..5dc865004 100644 --- a/src/backend/columnar/cstore_tableam.c +++ b/src/backend/columnar/cstore_tableam.c @@ -41,6 +41,7 @@ #include "storage/smgr.h" #include "tcop/utility.h" #include "utils/builtins.h" +#include "utils/memutils.h" #include "utils/pg_rusage.h" #include "utils/rel.h" #include "utils/syscache.h" @@ -445,10 +446,16 @@ static void cstore_tuple_insert(Relation relation, TupleTableSlot *slot, CommandId cid, int options, BulkInsertState bistate) { + /* + * cstore_init_write_state allocates the write state in a longer + * lasting context, so no need to worry about it. + */ TableWriteState *writeState = cstore_init_write_state(relation->rd_node, RelationGetDescr(relation), GetCurrentSubTransactionId()); + MemoryContext oldContext = MemoryContextSwitchTo(writeState->perTupleContext); + HeapTuple heapTuple = ExecCopySlotHeapTuple(slot); if (HeapTupleHasExternal(heapTuple)) { @@ -462,6 +469,9 @@ cstore_tuple_insert(Relation relation, TupleTableSlot *slot, CommandId cid, slot_getallattrs(slot); CStoreWriteRow(writeState, slot->tts_values, slot->tts_isnull); + + MemoryContextSwitchTo(oldContext); + MemoryContextReset(writeState->perTupleContext); } @@ -493,6 +503,7 @@ cstore_multi_insert(Relation relation, TupleTableSlot **slots, int ntuples, for (int i = 0; i < ntuples; i++) { TupleTableSlot *tupleSlot = slots[i]; + MemoryContext oldContext = MemoryContextSwitchTo(writeState->perTupleContext); HeapTuple heapTuple = ExecCopySlotHeapTuple(tupleSlot); if (HeapTupleHasExternal(heapTuple)) @@ -507,7 +518,10 @@ cstore_multi_insert(Relation relation, TupleTableSlot **slots, int ntuples, slot_getallattrs(tupleSlot); CStoreWriteRow(writeState, tupleSlot->tts_values, tupleSlot->tts_isnull); + MemoryContextSwitchTo(oldContext); } + + MemoryContextReset(writeState->perTupleContext); } diff --git a/src/backend/columnar/cstore_writer.c b/src/backend/columnar/cstore_writer.c index 75629241d..ca0958ecf 100644 --- a/src/backend/columnar/cstore_writer.c +++ b/src/backend/columnar/cstore_writer.c @@ -24,6 +24,7 @@ #include "miscadmin.h" #include "storage/fd.h" #include "storage/smgr.h" +#include "utils/guc.h" #include "utils/memutils.h" #include "utils/rel.h" #include "utils/relfilenodemap.h" @@ -51,7 +52,6 @@ static void UpdateBlockSkipNodeMinMax(ColumnBlockSkipNode *blockSkipNode, static Datum DatumCopy(Datum datum, bool datumTypeByValue, int datumTypeLength); static StringInfo CopyStringInfo(StringInfo sourceString); - /* * CStoreBeginWrite initializes a cstore data load operation and returns a table * handle. This handle should be used for adding the row values and finishing the @@ -114,6 +114,9 @@ CStoreBeginWrite(RelFileNode relfilenode, writeState->stripeWriteContext = stripeWriteContext; writeState->blockData = blockData; writeState->compressionBuffer = NULL; + writeState->perTupleContext = AllocSetContextCreate(stripeWriteContext, + "CStore per tuple context", + ALLOCSET_DEFAULT_SIZES); return writeState; } @@ -241,6 +244,7 @@ CStoreFlushPendingWrites(TableWriteState *writeState) MemoryContext oldContext = MemoryContextSwitchTo(writeState->stripeWriteContext); FlushStripe(writeState); + MemoryContextReset(writeState->stripeWriteContext); /* set stripe data and skip list to NULL so they are recreated next time */ writeState->stripeBuffers = NULL; @@ -417,6 +421,8 @@ FlushStripe(TableWriteState *writeState) uint64 stripeSize = 0; uint64 stripeRowCount = 0; + elog(DEBUG1, "Flushing Stripe of size %d", stripeBuffers->rowCount); + Oid relationId = RelidByRelfilenode(writeState->relfilenode.spcNode, writeState->relfilenode.relNode); Relation relation = relation_open(relationId, NoLock); diff --git a/src/backend/columnar/write_state_management.c b/src/backend/columnar/write_state_management.c index 30b93a5d0..af56775f2 100644 --- a/src/backend/columnar/write_state_management.c +++ b/src/backend/columnar/write_state_management.c @@ -1,8 +1,10 @@ #include "citus_version.h" -#if HAS_TABLEAM #include "postgres.h" +#include "columnar/cstore.h" + +#if HAS_TABLEAM #include @@ -45,7 +47,6 @@ #include "utils/rel.h" #include "utils/syscache.h" -#include "columnar/cstore.h" #include "columnar/cstore_customscan.h" #include "columnar/cstore_tableam.h" #include "columnar/cstore_version_compat.h" @@ -378,3 +379,17 @@ PendingWritesInUpperTransactions(Oid relfilenode, SubTransactionId currentSubXid #endif + +/* + * GetWriteContextForDebug exposes WriteStateContext for debugging + * purposes. + */ +extern MemoryContext +GetWriteContextForDebug(void) +{ +#if HAS_TABLEAM + return WriteStateContext; +#else + return NULL; +#endif +} diff --git a/src/include/columnar/cstore.h b/src/include/columnar/cstore.h index edfe65efe..1f18fbc55 100644 --- a/src/include/columnar/cstore.h +++ b/src/include/columnar/cstore.h @@ -228,6 +228,7 @@ typedef struct TableWriteState RelFileNode relfilenode; MemoryContext stripeWriteContext; + MemoryContext perTupleContext; StripeBuffers *stripeBuffers; StripeSkipList *stripeSkipList; uint32 stripeMaxRowCount; @@ -319,6 +320,7 @@ extern void MarkRelfilenodeDropped(Oid relfilenode, SubTransactionId currentSubX extern void NonTransactionDropWriteState(Oid relfilenode); extern bool PendingWritesInUpperTransactions(Oid relfilenode, SubTransactionId currentSubXid); +extern MemoryContext GetWriteContextForDebug(void); typedef struct SmgrAddr { diff --git a/src/test/regress/columnar_am_schedule b/src/test/regress/columnar_am_schedule index 1519da29f..39fcd78bb 100644 --- a/src/test/regress/columnar_am_schedule +++ b/src/test/regress/columnar_am_schedule @@ -19,3 +19,4 @@ test: am_tableoptions test: am_recursive test: am_transactions test: am_matview +test: am_memory diff --git a/src/test/regress/expected/am_memory.out b/src/test/regress/expected/am_memory.out new file mode 100644 index 000000000..8a4b038c6 --- /dev/null +++ b/src/test/regress/expected/am_memory.out @@ -0,0 +1,106 @@ +-- +-- Testing memory usage of columnar tables. +-- +CREATE SCHEMA columnar_memory; +SET search_path TO 'columnar_memory'; +CREATE OR REPLACE FUNCTION column_store_memory_stats() + RETURNS TABLE(TopMemoryContext BIGINT, + TopTransactionContext BIGINT, + WriteStateContext BIGINT) + LANGUAGE C STRICT VOLATILE + AS 'citus', $$column_store_memory_stats$$; +CREATE FUNCTION top_memory_context_usage() + RETURNS BIGINT AS $$ + SELECT TopMemoryContext FROM column_store_memory_stats(); + $$ LANGUAGE SQL VOLATILE; +SET cstore.stripe_row_count TO 50000; +SET cstore.compression TO 'pglz'; +CREATE TABLE t (a int, tag text, memusage bigint) USING columnar; +-- measure memory before doing writes +SELECT TopMemoryContext as top_pre, + WriteStateContext write_pre +FROM column_store_memory_stats() \gset +BEGIN; +SET LOCAL client_min_messages TO DEBUG1; +-- measure memory just before flushing 1st stripe +INSERT INTO t + SELECT i, 'first batch', + -- sample memusage instead of recording everyr row for speed + CASE WHEN i % 100 = 0 THEN top_memory_context_usage() ELSE 0 END + FROM generate_series(1, 49999) i; +SELECT TopMemoryContext as top0, + TopTransactionContext xact0, + WriteStateContext write0 +FROM column_store_memory_stats() \gset +-- flush 1st stripe, and measure memory just before flushing 2nd stripe +INSERT INTO t + SELECT i, 'second batch', 0 /* no need to record memusage per row */ + FROM generate_series(1, 50000) i; +DEBUG: Flushing Stripe of size 50000 +SELECT TopMemoryContext as top1, + TopTransactionContext xact1, + WriteStateContext write1 +FROM column_store_memory_stats() \gset +-- flush 2nd stripe, and measure memory just before flushing 3rd stripe +INSERT INTO t + SELECT i, 'third batch', 0 /* no need to record memusage per row */ + FROM generate_series(1, 50000) i; +DEBUG: Flushing Stripe of size 50000 +SELECT TopMemoryContext as top2, + TopTransactionContext xact2, + WriteStateContext write2 +FROM column_store_memory_stats() \gset +-- insert a large batch +INSERT INTO t + SELECT i, 'large batch', + -- sample memusage instead of recording everyr row for speed + CASE WHEN i % 100 = 0 THEN top_memory_context_usage() ELSE 0 END + FROM generate_series(1, 100000) i; +DEBUG: Flushing Stripe of size 50000 +DEBUG: Flushing Stripe of size 50000 +COMMIT; +DEBUG: Flushing Stripe of size 49999 +-- measure memory after doing writes +SELECT TopMemoryContext as top_post, + WriteStateContext write_post +FROM column_store_memory_stats() \gset +\x +SELECT (1.0 * :top2/:top1 BETWEEN 0.99 AND 1.01) AS top_growth_ok, + (1.0 * :xact1/:xact0 BETWEEN 0.99 AND 1.01) AND + (1.0 * :xact2/:xact0 BETWEEN 0.99 AND 1.01) AS xact_growth_ok, + (1.0 * :write1/:write0 BETWEEN 0.99 AND 1.01) AND + (1.0 * :write2/:write0 BETWEEN 0.99 AND 1.01) AS write_growth_ok, + :write_pre = 0 AND :write_post = 0 AS write_clear_outside_xact; +-[ RECORD 1 ]------------+-- +top_growth_ok | t +xact_growth_ok | t +write_growth_ok | t +write_clear_outside_xact | t + +-- inserting another bunch of rows should not grow top memory context +INSERT INTO t + SELECT i, 'last batch', 0 /* no need to record memusage per row */ + FROM generate_series(1, 50000) i; +SELECT 1.0 * TopMemoryContext / :top_post BETWEEN 0.99 AND 1.01 AS top_growth_ok +FROM column_store_memory_stats(); +-[ RECORD 1 ]-+-- +top_growth_ok | t + +-- before this change, max mem usage while executing inserts was 28MB and +-- with this change it's less than 8MB. +SELECT + (SELECT max(memusage) < 8 * 1024 * 1024 FROM t WHERE tag='large batch') AS large_batch_ok, + (SELECT max(memusage) < 8 * 1024 * 1024 FROM t WHERE tag='first batch') AS first_batch_ok; +-[ RECORD 1 ]--+-- +large_batch_ok | t +first_batch_ok | t + +\x +SELECT count(*) FROM t; + count +--------------------------------------------------------------------- + 299999 +(1 row) + +SET client_min_messages TO WARNING; +DROP SCHEMA columnar_memory CASCADE; diff --git a/src/test/regress/sql/am_memory.sql b/src/test/regress/sql/am_memory.sql new file mode 100644 index 000000000..6ce8fcf18 --- /dev/null +++ b/src/test/regress/sql/am_memory.sql @@ -0,0 +1,102 @@ +-- +-- Testing memory usage of columnar tables. +-- + +CREATE SCHEMA columnar_memory; +SET search_path TO 'columnar_memory'; + +CREATE OR REPLACE FUNCTION column_store_memory_stats() + RETURNS TABLE(TopMemoryContext BIGINT, + TopTransactionContext BIGINT, + WriteStateContext BIGINT) + LANGUAGE C STRICT VOLATILE + AS 'citus', $$column_store_memory_stats$$; + +CREATE FUNCTION top_memory_context_usage() + RETURNS BIGINT AS $$ + SELECT TopMemoryContext FROM column_store_memory_stats(); + $$ LANGUAGE SQL VOLATILE; + +SET cstore.stripe_row_count TO 50000; +SET cstore.compression TO 'pglz'; +CREATE TABLE t (a int, tag text, memusage bigint) USING columnar; + +-- measure memory before doing writes +SELECT TopMemoryContext as top_pre, + WriteStateContext write_pre +FROM column_store_memory_stats() \gset + +BEGIN; +SET LOCAL client_min_messages TO DEBUG1; + +-- measure memory just before flushing 1st stripe +INSERT INTO t + SELECT i, 'first batch', + -- sample memusage instead of recording everyr row for speed + CASE WHEN i % 100 = 0 THEN top_memory_context_usage() ELSE 0 END + FROM generate_series(1, 49999) i; +SELECT TopMemoryContext as top0, + TopTransactionContext xact0, + WriteStateContext write0 +FROM column_store_memory_stats() \gset + +-- flush 1st stripe, and measure memory just before flushing 2nd stripe +INSERT INTO t + SELECT i, 'second batch', 0 /* no need to record memusage per row */ + FROM generate_series(1, 50000) i; +SELECT TopMemoryContext as top1, + TopTransactionContext xact1, + WriteStateContext write1 +FROM column_store_memory_stats() \gset + +-- flush 2nd stripe, and measure memory just before flushing 3rd stripe +INSERT INTO t + SELECT i, 'third batch', 0 /* no need to record memusage per row */ + FROM generate_series(1, 50000) i; +SELECT TopMemoryContext as top2, + TopTransactionContext xact2, + WriteStateContext write2 +FROM column_store_memory_stats() \gset + +-- insert a large batch +INSERT INTO t + SELECT i, 'large batch', + -- sample memusage instead of recording everyr row for speed + CASE WHEN i % 100 = 0 THEN top_memory_context_usage() ELSE 0 END + FROM generate_series(1, 100000) i; + +COMMIT; + +-- measure memory after doing writes +SELECT TopMemoryContext as top_post, + WriteStateContext write_post +FROM column_store_memory_stats() \gset + +\x +SELECT (1.0 * :top2/:top1 BETWEEN 0.99 AND 1.01) AS top_growth_ok, + (1.0 * :xact1/:xact0 BETWEEN 0.99 AND 1.01) AND + (1.0 * :xact2/:xact0 BETWEEN 0.99 AND 1.01) AS xact_growth_ok, + (1.0 * :write1/:write0 BETWEEN 0.99 AND 1.01) AND + (1.0 * :write2/:write0 BETWEEN 0.99 AND 1.01) AS write_growth_ok, + :write_pre = 0 AND :write_post = 0 AS write_clear_outside_xact; + +-- inserting another bunch of rows should not grow top memory context +INSERT INTO t + SELECT i, 'last batch', 0 /* no need to record memusage per row */ + FROM generate_series(1, 50000) i; + +SELECT 1.0 * TopMemoryContext / :top_post BETWEEN 0.99 AND 1.01 AS top_growth_ok +FROM column_store_memory_stats(); + +-- before this change, max mem usage while executing inserts was 28MB and +-- with this change it's less than 8MB. +SELECT + (SELECT max(memusage) < 8 * 1024 * 1024 FROM t WHERE tag='large batch') AS large_batch_ok, + (SELECT max(memusage) < 8 * 1024 * 1024 FROM t WHERE tag='first batch') AS first_batch_ok; + +\x + +SELECT count(*) FROM t; + +SET client_min_messages TO WARNING; +DROP SCHEMA columnar_memory CASCADE;