mirror of https://github.com/citusdata/citus.git
Merge pull request #4336 from citusdata/cstore_memory_leaks
Fix memory leaks in column storecstore_storageid
commit
fc0ef8abba
|
@ -0,0 +1,99 @@
|
|||
/*-------------------------------------------------------------------------
|
||||
*
|
||||
* cstore_debug.c
|
||||
*
|
||||
* Helper functions to debug column store.
|
||||
*
|
||||
*-------------------------------------------------------------------------
|
||||
*/
|
||||
|
||||
|
||||
#include "postgres.h"
|
||||
|
||||
#include "pg_config.h"
|
||||
#include "access/nbtree.h"
|
||||
#include "catalog/pg_am.h"
|
||||
#include "catalog/pg_type.h"
|
||||
#include "distributed/pg_version_constants.h"
|
||||
#include "distributed/tuplestore.h"
|
||||
#include "miscadmin.h"
|
||||
#include "storage/fd.h"
|
||||
#include "storage/smgr.h"
|
||||
#include "utils/guc.h"
|
||||
#include "utils/memutils.h"
|
||||
#include "utils/rel.h"
|
||||
#include "utils/tuplestore.h"
|
||||
|
||||
#include "columnar/cstore.h"
|
||||
#include "columnar/cstore_version_compat.h"
|
||||
|
||||
static void MemoryContextTotals(MemoryContext context, MemoryContextCounters *counters);
|
||||
|
||||
PG_FUNCTION_INFO_V1(column_store_memory_stats);
|
||||
|
||||
|
||||
/*
|
||||
* column_store_memory_stats returns a record of 3 values: size of
|
||||
* TopMemoryContext, TopTransactionContext, and Write State context.
|
||||
*/
|
||||
Datum
|
||||
column_store_memory_stats(PG_FUNCTION_ARGS)
|
||||
{
|
||||
TupleDesc tupleDescriptor = NULL;
|
||||
const int resultColumnCount = 3;
|
||||
|
||||
#if PG_VERSION_NUM >= PG_VERSION_12
|
||||
tupleDescriptor = CreateTemplateTupleDesc(resultColumnCount);
|
||||
#else
|
||||
tupleDescriptor = CreateTemplateTupleDesc(resultColumnCount, false);
|
||||
#endif
|
||||
|
||||
TupleDescInitEntry(tupleDescriptor, (AttrNumber) 1, "TopMemoryContext",
|
||||
INT8OID, -1, 0);
|
||||
TupleDescInitEntry(tupleDescriptor, (AttrNumber) 2, "TopTransactionContext",
|
||||
INT8OID, -1, 0);
|
||||
TupleDescInitEntry(tupleDescriptor, (AttrNumber) 3, "WriteStateContext",
|
||||
INT8OID, -1, 0);
|
||||
|
||||
MemoryContextCounters transactionCounters = { 0 };
|
||||
MemoryContextCounters topCounters = { 0 };
|
||||
MemoryContextCounters writeStateCounters = { 0 };
|
||||
MemoryContextTotals(TopTransactionContext, &transactionCounters);
|
||||
MemoryContextTotals(TopMemoryContext, &topCounters);
|
||||
MemoryContextTotals(GetWriteContextForDebug(), &writeStateCounters);
|
||||
|
||||
bool nulls[3] = { false };
|
||||
Datum values[3] = {
|
||||
Int64GetDatum(topCounters.totalspace),
|
||||
Int64GetDatum(transactionCounters.totalspace),
|
||||
Int64GetDatum(writeStateCounters.totalspace)
|
||||
};
|
||||
|
||||
Tuplestorestate *tupleStore = SetupTuplestore(fcinfo, &tupleDescriptor);
|
||||
tuplestore_putvalues(tupleStore, tupleDescriptor, values, nulls);
|
||||
tuplestore_donestoring(tupleStore);
|
||||
|
||||
PG_RETURN_DATUM(0);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* MemoryContextTotals adds stats of the given memory context and its
|
||||
* subtree to the given counters.
|
||||
*/
|
||||
static void
|
||||
MemoryContextTotals(MemoryContext context, MemoryContextCounters *counters)
|
||||
{
|
||||
if (context == NULL)
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
MemoryContext child;
|
||||
for (child = context->firstchild; child != NULL; child = child->nextchild)
|
||||
{
|
||||
MemoryContextTotals(child, counters);
|
||||
}
|
||||
|
||||
context->methods->stats(context, NULL, NULL, counters);
|
||||
}
|
|
@ -41,6 +41,7 @@
|
|||
#include "storage/smgr.h"
|
||||
#include "tcop/utility.h"
|
||||
#include "utils/builtins.h"
|
||||
#include "utils/memutils.h"
|
||||
#include "utils/pg_rusage.h"
|
||||
#include "utils/rel.h"
|
||||
#include "utils/syscache.h"
|
||||
|
@ -445,10 +446,16 @@ static void
|
|||
cstore_tuple_insert(Relation relation, TupleTableSlot *slot, CommandId cid,
|
||||
int options, BulkInsertState bistate)
|
||||
{
|
||||
/*
|
||||
* cstore_init_write_state allocates the write state in a longer
|
||||
* lasting context, so no need to worry about it.
|
||||
*/
|
||||
TableWriteState *writeState = cstore_init_write_state(relation->rd_node,
|
||||
RelationGetDescr(relation),
|
||||
GetCurrentSubTransactionId());
|
||||
|
||||
MemoryContext oldContext = MemoryContextSwitchTo(writeState->perTupleContext);
|
||||
|
||||
HeapTuple heapTuple = ExecCopySlotHeapTuple(slot);
|
||||
if (HeapTupleHasExternal(heapTuple))
|
||||
{
|
||||
|
@ -462,6 +469,9 @@ cstore_tuple_insert(Relation relation, TupleTableSlot *slot, CommandId cid,
|
|||
slot_getallattrs(slot);
|
||||
|
||||
CStoreWriteRow(writeState, slot->tts_values, slot->tts_isnull);
|
||||
|
||||
MemoryContextSwitchTo(oldContext);
|
||||
MemoryContextReset(writeState->perTupleContext);
|
||||
}
|
||||
|
||||
|
||||
|
@ -493,6 +503,7 @@ cstore_multi_insert(Relation relation, TupleTableSlot **slots, int ntuples,
|
|||
for (int i = 0; i < ntuples; i++)
|
||||
{
|
||||
TupleTableSlot *tupleSlot = slots[i];
|
||||
MemoryContext oldContext = MemoryContextSwitchTo(writeState->perTupleContext);
|
||||
HeapTuple heapTuple = ExecCopySlotHeapTuple(tupleSlot);
|
||||
|
||||
if (HeapTupleHasExternal(heapTuple))
|
||||
|
@ -507,7 +518,10 @@ cstore_multi_insert(Relation relation, TupleTableSlot **slots, int ntuples,
|
|||
slot_getallattrs(tupleSlot);
|
||||
|
||||
CStoreWriteRow(writeState, tupleSlot->tts_values, tupleSlot->tts_isnull);
|
||||
MemoryContextSwitchTo(oldContext);
|
||||
}
|
||||
|
||||
MemoryContextReset(writeState->perTupleContext);
|
||||
}
|
||||
|
||||
|
||||
|
|
|
@ -24,6 +24,7 @@
|
|||
#include "miscadmin.h"
|
||||
#include "storage/fd.h"
|
||||
#include "storage/smgr.h"
|
||||
#include "utils/guc.h"
|
||||
#include "utils/memutils.h"
|
||||
#include "utils/rel.h"
|
||||
#include "utils/relfilenodemap.h"
|
||||
|
@ -51,7 +52,6 @@ static void UpdateBlockSkipNodeMinMax(ColumnBlockSkipNode *blockSkipNode,
|
|||
static Datum DatumCopy(Datum datum, bool datumTypeByValue, int datumTypeLength);
|
||||
static StringInfo CopyStringInfo(StringInfo sourceString);
|
||||
|
||||
|
||||
/*
|
||||
* CStoreBeginWrite initializes a cstore data load operation and returns a table
|
||||
* handle. This handle should be used for adding the row values and finishing the
|
||||
|
@ -114,6 +114,9 @@ CStoreBeginWrite(RelFileNode relfilenode,
|
|||
writeState->stripeWriteContext = stripeWriteContext;
|
||||
writeState->blockData = blockData;
|
||||
writeState->compressionBuffer = NULL;
|
||||
writeState->perTupleContext = AllocSetContextCreate(stripeWriteContext,
|
||||
"CStore per tuple context",
|
||||
ALLOCSET_DEFAULT_SIZES);
|
||||
|
||||
return writeState;
|
||||
}
|
||||
|
@ -241,6 +244,7 @@ CStoreFlushPendingWrites(TableWriteState *writeState)
|
|||
MemoryContext oldContext = MemoryContextSwitchTo(writeState->stripeWriteContext);
|
||||
|
||||
FlushStripe(writeState);
|
||||
MemoryContextReset(writeState->stripeWriteContext);
|
||||
|
||||
/* set stripe data and skip list to NULL so they are recreated next time */
|
||||
writeState->stripeBuffers = NULL;
|
||||
|
@ -417,6 +421,8 @@ FlushStripe(TableWriteState *writeState)
|
|||
uint64 stripeSize = 0;
|
||||
uint64 stripeRowCount = 0;
|
||||
|
||||
elog(DEBUG1, "Flushing Stripe of size %d", stripeBuffers->rowCount);
|
||||
|
||||
Oid relationId = RelidByRelfilenode(writeState->relfilenode.spcNode,
|
||||
writeState->relfilenode.relNode);
|
||||
Relation relation = relation_open(relationId, NoLock);
|
||||
|
|
|
@ -1,8 +1,10 @@
|
|||
|
||||
#include "citus_version.h"
|
||||
#if HAS_TABLEAM
|
||||
|
||||
#include "postgres.h"
|
||||
#include "columnar/cstore.h"
|
||||
|
||||
#if HAS_TABLEAM
|
||||
|
||||
#include <math.h>
|
||||
|
||||
|
@ -45,7 +47,6 @@
|
|||
#include "utils/rel.h"
|
||||
#include "utils/syscache.h"
|
||||
|
||||
#include "columnar/cstore.h"
|
||||
#include "columnar/cstore_customscan.h"
|
||||
#include "columnar/cstore_tableam.h"
|
||||
#include "columnar/cstore_version_compat.h"
|
||||
|
@ -378,3 +379,17 @@ PendingWritesInUpperTransactions(Oid relfilenode, SubTransactionId currentSubXid
|
|||
|
||||
|
||||
#endif
|
||||
|
||||
/*
|
||||
* GetWriteContextForDebug exposes WriteStateContext for debugging
|
||||
* purposes.
|
||||
*/
|
||||
extern MemoryContext
|
||||
GetWriteContextForDebug(void)
|
||||
{
|
||||
#if HAS_TABLEAM
|
||||
return WriteStateContext;
|
||||
#else
|
||||
return NULL;
|
||||
#endif
|
||||
}
|
||||
|
|
|
@ -228,6 +228,7 @@ typedef struct TableWriteState
|
|||
RelFileNode relfilenode;
|
||||
|
||||
MemoryContext stripeWriteContext;
|
||||
MemoryContext perTupleContext;
|
||||
StripeBuffers *stripeBuffers;
|
||||
StripeSkipList *stripeSkipList;
|
||||
uint32 stripeMaxRowCount;
|
||||
|
@ -319,6 +320,7 @@ extern void MarkRelfilenodeDropped(Oid relfilenode, SubTransactionId currentSubX
|
|||
extern void NonTransactionDropWriteState(Oid relfilenode);
|
||||
extern bool PendingWritesInUpperTransactions(Oid relfilenode,
|
||||
SubTransactionId currentSubXid);
|
||||
extern MemoryContext GetWriteContextForDebug(void);
|
||||
|
||||
typedef struct SmgrAddr
|
||||
{
|
||||
|
|
|
@ -19,3 +19,4 @@ test: am_tableoptions
|
|||
test: am_recursive
|
||||
test: am_transactions
|
||||
test: am_matview
|
||||
test: am_memory
|
||||
|
|
|
@ -0,0 +1,106 @@
|
|||
--
|
||||
-- Testing memory usage of columnar tables.
|
||||
--
|
||||
CREATE SCHEMA columnar_memory;
|
||||
SET search_path TO 'columnar_memory';
|
||||
CREATE OR REPLACE FUNCTION column_store_memory_stats()
|
||||
RETURNS TABLE(TopMemoryContext BIGINT,
|
||||
TopTransactionContext BIGINT,
|
||||
WriteStateContext BIGINT)
|
||||
LANGUAGE C STRICT VOLATILE
|
||||
AS 'citus', $$column_store_memory_stats$$;
|
||||
CREATE FUNCTION top_memory_context_usage()
|
||||
RETURNS BIGINT AS $$
|
||||
SELECT TopMemoryContext FROM column_store_memory_stats();
|
||||
$$ LANGUAGE SQL VOLATILE;
|
||||
SET cstore.stripe_row_count TO 50000;
|
||||
SET cstore.compression TO 'pglz';
|
||||
CREATE TABLE t (a int, tag text, memusage bigint) USING columnar;
|
||||
-- measure memory before doing writes
|
||||
SELECT TopMemoryContext as top_pre,
|
||||
WriteStateContext write_pre
|
||||
FROM column_store_memory_stats() \gset
|
||||
BEGIN;
|
||||
SET LOCAL client_min_messages TO DEBUG1;
|
||||
-- measure memory just before flushing 1st stripe
|
||||
INSERT INTO t
|
||||
SELECT i, 'first batch',
|
||||
-- sample memusage instead of recording everyr row for speed
|
||||
CASE WHEN i % 100 = 0 THEN top_memory_context_usage() ELSE 0 END
|
||||
FROM generate_series(1, 49999) i;
|
||||
SELECT TopMemoryContext as top0,
|
||||
TopTransactionContext xact0,
|
||||
WriteStateContext write0
|
||||
FROM column_store_memory_stats() \gset
|
||||
-- flush 1st stripe, and measure memory just before flushing 2nd stripe
|
||||
INSERT INTO t
|
||||
SELECT i, 'second batch', 0 /* no need to record memusage per row */
|
||||
FROM generate_series(1, 50000) i;
|
||||
DEBUG: Flushing Stripe of size 50000
|
||||
SELECT TopMemoryContext as top1,
|
||||
TopTransactionContext xact1,
|
||||
WriteStateContext write1
|
||||
FROM column_store_memory_stats() \gset
|
||||
-- flush 2nd stripe, and measure memory just before flushing 3rd stripe
|
||||
INSERT INTO t
|
||||
SELECT i, 'third batch', 0 /* no need to record memusage per row */
|
||||
FROM generate_series(1, 50000) i;
|
||||
DEBUG: Flushing Stripe of size 50000
|
||||
SELECT TopMemoryContext as top2,
|
||||
TopTransactionContext xact2,
|
||||
WriteStateContext write2
|
||||
FROM column_store_memory_stats() \gset
|
||||
-- insert a large batch
|
||||
INSERT INTO t
|
||||
SELECT i, 'large batch',
|
||||
-- sample memusage instead of recording everyr row for speed
|
||||
CASE WHEN i % 100 = 0 THEN top_memory_context_usage() ELSE 0 END
|
||||
FROM generate_series(1, 100000) i;
|
||||
DEBUG: Flushing Stripe of size 50000
|
||||
DEBUG: Flushing Stripe of size 50000
|
||||
COMMIT;
|
||||
DEBUG: Flushing Stripe of size 49999
|
||||
-- measure memory after doing writes
|
||||
SELECT TopMemoryContext as top_post,
|
||||
WriteStateContext write_post
|
||||
FROM column_store_memory_stats() \gset
|
||||
\x
|
||||
SELECT (1.0 * :top2/:top1 BETWEEN 0.99 AND 1.01) AS top_growth_ok,
|
||||
(1.0 * :xact1/:xact0 BETWEEN 0.99 AND 1.01) AND
|
||||
(1.0 * :xact2/:xact0 BETWEEN 0.99 AND 1.01) AS xact_growth_ok,
|
||||
(1.0 * :write1/:write0 BETWEEN 0.99 AND 1.01) AND
|
||||
(1.0 * :write2/:write0 BETWEEN 0.99 AND 1.01) AS write_growth_ok,
|
||||
:write_pre = 0 AND :write_post = 0 AS write_clear_outside_xact;
|
||||
-[ RECORD 1 ]------------+--
|
||||
top_growth_ok | t
|
||||
xact_growth_ok | t
|
||||
write_growth_ok | t
|
||||
write_clear_outside_xact | t
|
||||
|
||||
-- inserting another bunch of rows should not grow top memory context
|
||||
INSERT INTO t
|
||||
SELECT i, 'last batch', 0 /* no need to record memusage per row */
|
||||
FROM generate_series(1, 50000) i;
|
||||
SELECT 1.0 * TopMemoryContext / :top_post BETWEEN 0.99 AND 1.01 AS top_growth_ok
|
||||
FROM column_store_memory_stats();
|
||||
-[ RECORD 1 ]-+--
|
||||
top_growth_ok | t
|
||||
|
||||
-- before this change, max mem usage while executing inserts was 28MB and
|
||||
-- with this change it's less than 8MB.
|
||||
SELECT
|
||||
(SELECT max(memusage) < 8 * 1024 * 1024 FROM t WHERE tag='large batch') AS large_batch_ok,
|
||||
(SELECT max(memusage) < 8 * 1024 * 1024 FROM t WHERE tag='first batch') AS first_batch_ok;
|
||||
-[ RECORD 1 ]--+--
|
||||
large_batch_ok | t
|
||||
first_batch_ok | t
|
||||
|
||||
\x
|
||||
SELECT count(*) FROM t;
|
||||
count
|
||||
---------------------------------------------------------------------
|
||||
299999
|
||||
(1 row)
|
||||
|
||||
SET client_min_messages TO WARNING;
|
||||
DROP SCHEMA columnar_memory CASCADE;
|
|
@ -0,0 +1,102 @@
|
|||
--
|
||||
-- Testing memory usage of columnar tables.
|
||||
--
|
||||
|
||||
CREATE SCHEMA columnar_memory;
|
||||
SET search_path TO 'columnar_memory';
|
||||
|
||||
CREATE OR REPLACE FUNCTION column_store_memory_stats()
|
||||
RETURNS TABLE(TopMemoryContext BIGINT,
|
||||
TopTransactionContext BIGINT,
|
||||
WriteStateContext BIGINT)
|
||||
LANGUAGE C STRICT VOLATILE
|
||||
AS 'citus', $$column_store_memory_stats$$;
|
||||
|
||||
CREATE FUNCTION top_memory_context_usage()
|
||||
RETURNS BIGINT AS $$
|
||||
SELECT TopMemoryContext FROM column_store_memory_stats();
|
||||
$$ LANGUAGE SQL VOLATILE;
|
||||
|
||||
SET cstore.stripe_row_count TO 50000;
|
||||
SET cstore.compression TO 'pglz';
|
||||
CREATE TABLE t (a int, tag text, memusage bigint) USING columnar;
|
||||
|
||||
-- measure memory before doing writes
|
||||
SELECT TopMemoryContext as top_pre,
|
||||
WriteStateContext write_pre
|
||||
FROM column_store_memory_stats() \gset
|
||||
|
||||
BEGIN;
|
||||
SET LOCAL client_min_messages TO DEBUG1;
|
||||
|
||||
-- measure memory just before flushing 1st stripe
|
||||
INSERT INTO t
|
||||
SELECT i, 'first batch',
|
||||
-- sample memusage instead of recording everyr row for speed
|
||||
CASE WHEN i % 100 = 0 THEN top_memory_context_usage() ELSE 0 END
|
||||
FROM generate_series(1, 49999) i;
|
||||
SELECT TopMemoryContext as top0,
|
||||
TopTransactionContext xact0,
|
||||
WriteStateContext write0
|
||||
FROM column_store_memory_stats() \gset
|
||||
|
||||
-- flush 1st stripe, and measure memory just before flushing 2nd stripe
|
||||
INSERT INTO t
|
||||
SELECT i, 'second batch', 0 /* no need to record memusage per row */
|
||||
FROM generate_series(1, 50000) i;
|
||||
SELECT TopMemoryContext as top1,
|
||||
TopTransactionContext xact1,
|
||||
WriteStateContext write1
|
||||
FROM column_store_memory_stats() \gset
|
||||
|
||||
-- flush 2nd stripe, and measure memory just before flushing 3rd stripe
|
||||
INSERT INTO t
|
||||
SELECT i, 'third batch', 0 /* no need to record memusage per row */
|
||||
FROM generate_series(1, 50000) i;
|
||||
SELECT TopMemoryContext as top2,
|
||||
TopTransactionContext xact2,
|
||||
WriteStateContext write2
|
||||
FROM column_store_memory_stats() \gset
|
||||
|
||||
-- insert a large batch
|
||||
INSERT INTO t
|
||||
SELECT i, 'large batch',
|
||||
-- sample memusage instead of recording everyr row for speed
|
||||
CASE WHEN i % 100 = 0 THEN top_memory_context_usage() ELSE 0 END
|
||||
FROM generate_series(1, 100000) i;
|
||||
|
||||
COMMIT;
|
||||
|
||||
-- measure memory after doing writes
|
||||
SELECT TopMemoryContext as top_post,
|
||||
WriteStateContext write_post
|
||||
FROM column_store_memory_stats() \gset
|
||||
|
||||
\x
|
||||
SELECT (1.0 * :top2/:top1 BETWEEN 0.99 AND 1.01) AS top_growth_ok,
|
||||
(1.0 * :xact1/:xact0 BETWEEN 0.99 AND 1.01) AND
|
||||
(1.0 * :xact2/:xact0 BETWEEN 0.99 AND 1.01) AS xact_growth_ok,
|
||||
(1.0 * :write1/:write0 BETWEEN 0.99 AND 1.01) AND
|
||||
(1.0 * :write2/:write0 BETWEEN 0.99 AND 1.01) AS write_growth_ok,
|
||||
:write_pre = 0 AND :write_post = 0 AS write_clear_outside_xact;
|
||||
|
||||
-- inserting another bunch of rows should not grow top memory context
|
||||
INSERT INTO t
|
||||
SELECT i, 'last batch', 0 /* no need to record memusage per row */
|
||||
FROM generate_series(1, 50000) i;
|
||||
|
||||
SELECT 1.0 * TopMemoryContext / :top_post BETWEEN 0.99 AND 1.01 AS top_growth_ok
|
||||
FROM column_store_memory_stats();
|
||||
|
||||
-- before this change, max mem usage while executing inserts was 28MB and
|
||||
-- with this change it's less than 8MB.
|
||||
SELECT
|
||||
(SELECT max(memusage) < 8 * 1024 * 1024 FROM t WHERE tag='large batch') AS large_batch_ok,
|
||||
(SELECT max(memusage) < 8 * 1024 * 1024 FROM t WHERE tag='first batch') AS first_batch_ok;
|
||||
|
||||
\x
|
||||
|
||||
SELECT count(*) FROM t;
|
||||
|
||||
SET client_min_messages TO WARNING;
|
||||
DROP SCHEMA columnar_memory CASCADE;
|
Loading…
Reference in New Issue