mirror of https://github.com/citusdata/citus.git
Columnar: make read and write state private.
parent
353b080474
commit
2ea31c899e
|
@ -490,7 +490,7 @@ ColumnarScan_ExplainCustomScan(CustomScanState *node, List *ancestors,
|
||||||
|
|
||||||
if (scanDesc != NULL)
|
if (scanDesc != NULL)
|
||||||
{
|
{
|
||||||
int64 chunkGroupsFiltered = ColumnarGetChunkGroupsFiltered(scanDesc);
|
int64 chunkGroupsFiltered = ColumnarScanChunkGroupsFiltered(scanDesc);
|
||||||
ExplainPropertyInteger("Columnar Chunk Groups Removed by Filter", NULL,
|
ExplainPropertyInteger("Columnar Chunk Groups Removed by Filter", NULL,
|
||||||
chunkGroupsFiltered, es);
|
chunkGroupsFiltered, es);
|
||||||
}
|
}
|
||||||
|
|
|
@ -39,6 +39,30 @@
|
||||||
#include "columnar/columnar.h"
|
#include "columnar/columnar.h"
|
||||||
#include "columnar/columnar_version_compat.h"
|
#include "columnar/columnar_version_compat.h"
|
||||||
|
|
||||||
|
struct TableReadState
|
||||||
|
{
|
||||||
|
List *stripeList;
|
||||||
|
StripeMetadata *currentStripeMetadata;
|
||||||
|
TupleDesc tupleDescriptor;
|
||||||
|
Relation relation;
|
||||||
|
|
||||||
|
/*
|
||||||
|
* List of Var pointers for columns in the query. We use this both for
|
||||||
|
* getting vector of projected columns, and also when we want to build
|
||||||
|
* base constraint to find selected row chunks.
|
||||||
|
*/
|
||||||
|
List *projectedColumnList;
|
||||||
|
|
||||||
|
List *whereClauseList;
|
||||||
|
MemoryContext stripeReadContext;
|
||||||
|
StripeBuffers *stripeBuffers;
|
||||||
|
uint32 readStripeCount;
|
||||||
|
uint64 stripeReadRowCount;
|
||||||
|
int64 chunkGroupsFiltered;
|
||||||
|
ChunkData *chunkData;
|
||||||
|
int32 deserializedChunkIndex;
|
||||||
|
};
|
||||||
|
|
||||||
/* static function declarations */
|
/* static function declarations */
|
||||||
static StripeBuffers * LoadFilteredStripeBuffers(Relation relation,
|
static StripeBuffers * LoadFilteredStripeBuffers(Relation relation,
|
||||||
StripeMetadata *stripeMetadata,
|
StripeMetadata *stripeMetadata,
|
||||||
|
@ -235,6 +259,18 @@ ColumnarEndRead(TableReadState *readState)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* ColumnarReadChunkGroupsFiltered
|
||||||
|
*
|
||||||
|
* Return the number of chunk groups filtered during this read operation.
|
||||||
|
*/
|
||||||
|
int64
|
||||||
|
ColumnarReadChunkGroupsFiltered(TableReadState *state)
|
||||||
|
{
|
||||||
|
return state->chunkGroupsFiltered;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* CreateEmptyChunkDataArray creates data buffers to keep deserialized exist and
|
* CreateEmptyChunkDataArray creates data buffers to keep deserialized exist and
|
||||||
* value arrays for requested columns in columnMask.
|
* value arrays for requested columns in columnMask.
|
||||||
|
|
|
@ -443,7 +443,8 @@ columnar_tuple_insert(Relation relation, TupleTableSlot *slot, CommandId cid,
|
||||||
TableWriteState *writeState = columnar_init_write_state(relation,
|
TableWriteState *writeState = columnar_init_write_state(relation,
|
||||||
RelationGetDescr(relation),
|
RelationGetDescr(relation),
|
||||||
GetCurrentSubTransactionId());
|
GetCurrentSubTransactionId());
|
||||||
MemoryContext oldContext = MemoryContextSwitchTo(writeState->perTupleContext);
|
MemoryContext oldContext = MemoryContextSwitchTo(ColumnarWritePerTupleContext(
|
||||||
|
writeState));
|
||||||
|
|
||||||
ColumnarCheckLogicalReplication(relation);
|
ColumnarCheckLogicalReplication(relation);
|
||||||
|
|
||||||
|
@ -455,7 +456,7 @@ columnar_tuple_insert(Relation relation, TupleTableSlot *slot, CommandId cid,
|
||||||
ColumnarWriteRow(writeState, values, slot->tts_isnull);
|
ColumnarWriteRow(writeState, values, slot->tts_isnull);
|
||||||
|
|
||||||
MemoryContextSwitchTo(oldContext);
|
MemoryContextSwitchTo(oldContext);
|
||||||
MemoryContextReset(writeState->perTupleContext);
|
MemoryContextReset(ColumnarWritePerTupleContext(writeState));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -486,7 +487,8 @@ columnar_multi_insert(Relation relation, TupleTableSlot **slots, int ntuples,
|
||||||
|
|
||||||
ColumnarCheckLogicalReplication(relation);
|
ColumnarCheckLogicalReplication(relation);
|
||||||
|
|
||||||
MemoryContext oldContext = MemoryContextSwitchTo(writeState->perTupleContext);
|
MemoryContext oldContext = MemoryContextSwitchTo(ColumnarWritePerTupleContext(
|
||||||
|
writeState));
|
||||||
|
|
||||||
for (int i = 0; i < ntuples; i++)
|
for (int i = 0; i < ntuples; i++)
|
||||||
{
|
{
|
||||||
|
@ -498,7 +500,7 @@ columnar_multi_insert(Relation relation, TupleTableSlot **slots, int ntuples,
|
||||||
tupleSlot->tts_values, tupleSlot->tts_isnull);
|
tupleSlot->tts_values, tupleSlot->tts_isnull);
|
||||||
|
|
||||||
ColumnarWriteRow(writeState, values, tupleSlot->tts_isnull);
|
ColumnarWriteRow(writeState, values, tupleSlot->tts_isnull);
|
||||||
MemoryContextReset(writeState->perTupleContext);
|
MemoryContextReset(ColumnarWritePerTupleContext(writeState));
|
||||||
}
|
}
|
||||||
|
|
||||||
MemoryContextSwitchTo(oldContext);
|
MemoryContextSwitchTo(oldContext);
|
||||||
|
@ -1158,7 +1160,7 @@ columnar_tableam_finish()
|
||||||
* Get the number of chunks filtered out during the given scan.
|
* Get the number of chunks filtered out during the given scan.
|
||||||
*/
|
*/
|
||||||
int64
|
int64
|
||||||
ColumnarGetChunkGroupsFiltered(TableScanDesc scanDesc)
|
ColumnarScanChunkGroupsFiltered(TableScanDesc scanDesc)
|
||||||
{
|
{
|
||||||
ColumnarScanDesc columnarScanDesc = (ColumnarScanDesc) scanDesc;
|
ColumnarScanDesc columnarScanDesc = (ColumnarScanDesc) scanDesc;
|
||||||
TableReadState *readState = columnarScanDesc->cs_readState;
|
TableReadState *readState = columnarScanDesc->cs_readState;
|
||||||
|
@ -1166,7 +1168,7 @@ ColumnarGetChunkGroupsFiltered(TableScanDesc scanDesc)
|
||||||
/* readState is initialized lazily */
|
/* readState is initialized lazily */
|
||||||
if (readState != NULL)
|
if (readState != NULL)
|
||||||
{
|
{
|
||||||
return readState->chunkGroupsFiltered;
|
return ColumnarReadChunkGroupsFiltered(readState);
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
|
|
|
@ -32,6 +32,28 @@
|
||||||
#include "columnar/columnar.h"
|
#include "columnar/columnar.h"
|
||||||
#include "columnar/columnar_version_compat.h"
|
#include "columnar/columnar_version_compat.h"
|
||||||
|
|
||||||
|
struct TableWriteState
|
||||||
|
{
|
||||||
|
TupleDesc tupleDescriptor;
|
||||||
|
FmgrInfo **comparisonFunctionArray;
|
||||||
|
RelFileNode relfilenode;
|
||||||
|
|
||||||
|
MemoryContext stripeWriteContext;
|
||||||
|
MemoryContext perTupleContext;
|
||||||
|
StripeBuffers *stripeBuffers;
|
||||||
|
StripeSkipList *stripeSkipList;
|
||||||
|
ColumnarOptions options;
|
||||||
|
ChunkData *chunkData;
|
||||||
|
|
||||||
|
/*
|
||||||
|
* compressionBuffer buffer is used as temporary storage during
|
||||||
|
* data value compression operation. It is kept here to minimize
|
||||||
|
* memory allocations. It lives in stripeWriteContext and gets
|
||||||
|
* deallocated when memory context is reset.
|
||||||
|
*/
|
||||||
|
StringInfo compressionBuffer;
|
||||||
|
};
|
||||||
|
|
||||||
static StripeBuffers * CreateEmptyStripeBuffers(uint32 stripeMaxRowCount,
|
static StripeBuffers * CreateEmptyStripeBuffers(uint32 stripeMaxRowCount,
|
||||||
uint32 chunkRowCount,
|
uint32 chunkRowCount,
|
||||||
uint32 columnCount);
|
uint32 columnCount);
|
||||||
|
@ -247,6 +269,18 @@ ColumnarFlushPendingWrites(TableWriteState *writeState)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* ColumnarWritePerTupleContext
|
||||||
|
*
|
||||||
|
* Return per-tuple context for columnar write operation.
|
||||||
|
*/
|
||||||
|
MemoryContext
|
||||||
|
ColumnarWritePerTupleContext(TableWriteState *state)
|
||||||
|
{
|
||||||
|
return state->perTupleContext;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* CreateEmptyStripeBuffers allocates an empty StripeBuffers structure with the given
|
* CreateEmptyStripeBuffers allocates an empty StripeBuffers structure with the given
|
||||||
* column count.
|
* column count.
|
||||||
|
|
|
@ -213,53 +213,13 @@ typedef struct StripeBuffers
|
||||||
|
|
||||||
|
|
||||||
/* TableReadState represents state of a columnar scan. */
|
/* TableReadState represents state of a columnar scan. */
|
||||||
typedef struct TableReadState
|
struct TableReadState;
|
||||||
{
|
typedef struct TableReadState TableReadState;
|
||||||
List *stripeList;
|
|
||||||
StripeMetadata *currentStripeMetadata;
|
|
||||||
TupleDesc tupleDescriptor;
|
|
||||||
Relation relation;
|
|
||||||
|
|
||||||
/*
|
|
||||||
* List of Var pointers for columns in the query. We use this both for
|
|
||||||
* getting vector of projected columns, and also when we want to build
|
|
||||||
* base constraint to find selected row chunks.
|
|
||||||
*/
|
|
||||||
List *projectedColumnList;
|
|
||||||
|
|
||||||
List *whereClauseList;
|
|
||||||
MemoryContext stripeReadContext;
|
|
||||||
StripeBuffers *stripeBuffers;
|
|
||||||
uint32 readStripeCount;
|
|
||||||
uint64 stripeReadRowCount;
|
|
||||||
int64 chunkGroupsFiltered;
|
|
||||||
ChunkData *chunkData;
|
|
||||||
int32 deserializedChunkIndex;
|
|
||||||
} TableReadState;
|
|
||||||
|
|
||||||
|
|
||||||
/* TableWriteState represents state of a columnar write operation. */
|
/* TableWriteState represents state of a columnar write operation. */
|
||||||
typedef struct TableWriteState
|
struct TableWriteState;
|
||||||
{
|
typedef struct TableWriteState TableWriteState;
|
||||||
TupleDesc tupleDescriptor;
|
|
||||||
FmgrInfo **comparisonFunctionArray;
|
|
||||||
RelFileNode relfilenode;
|
|
||||||
|
|
||||||
MemoryContext stripeWriteContext;
|
|
||||||
MemoryContext perTupleContext;
|
|
||||||
StripeBuffers *stripeBuffers;
|
|
||||||
StripeSkipList *stripeSkipList;
|
|
||||||
ColumnarOptions options;
|
|
||||||
ChunkData *chunkData;
|
|
||||||
|
|
||||||
/*
|
|
||||||
* compressionBuffer buffer is used as temporary storage during
|
|
||||||
* data value compression operation. It is kept here to minimize
|
|
||||||
* memory allocations. It lives in stripeWriteContext and gets
|
|
||||||
* deallocated when memory context is reset.
|
|
||||||
*/
|
|
||||||
StringInfo compressionBuffer;
|
|
||||||
} TableWriteState;
|
|
||||||
|
|
||||||
extern int columnar_compression;
|
extern int columnar_compression;
|
||||||
extern int columnar_stripe_row_limit;
|
extern int columnar_stripe_row_limit;
|
||||||
|
@ -279,6 +239,7 @@ extern void ColumnarWriteRow(TableWriteState *state, Datum *columnValues,
|
||||||
extern void ColumnarFlushPendingWrites(TableWriteState *state);
|
extern void ColumnarFlushPendingWrites(TableWriteState *state);
|
||||||
extern void ColumnarEndWrite(TableWriteState *state);
|
extern void ColumnarEndWrite(TableWriteState *state);
|
||||||
extern bool ContainsPendingWrites(TableWriteState *state);
|
extern bool ContainsPendingWrites(TableWriteState *state);
|
||||||
|
extern MemoryContext ColumnarWritePerTupleContext(TableWriteState *state);
|
||||||
|
|
||||||
/* Function declarations for reading from columnar table */
|
/* Function declarations for reading from columnar table */
|
||||||
extern TableReadState * ColumnarBeginRead(Relation relation,
|
extern TableReadState * ColumnarBeginRead(Relation relation,
|
||||||
|
@ -289,6 +250,7 @@ extern bool ColumnarReadNextRow(TableReadState *state, Datum *columnValues,
|
||||||
bool *columnNulls);
|
bool *columnNulls);
|
||||||
extern void ColumnarRescan(TableReadState *readState);
|
extern void ColumnarRescan(TableReadState *readState);
|
||||||
extern void ColumnarEndRead(TableReadState *state);
|
extern void ColumnarEndRead(TableReadState *state);
|
||||||
|
extern int64 ColumnarReadChunkGroupsFiltered(TableReadState *state);
|
||||||
|
|
||||||
/* Function declarations for common functions */
|
/* Function declarations for common functions */
|
||||||
extern FmgrInfo * GetFunctionInfoOrNull(Oid typeId, Oid accessMethodId,
|
extern FmgrInfo * GetFunctionInfoOrNull(Oid typeId, Oid accessMethodId,
|
||||||
|
|
|
@ -18,7 +18,7 @@ extern TableScanDesc columnar_beginscan_extended(Relation relation, Snapshot sna
|
||||||
ParallelTableScanDesc parallel_scan,
|
ParallelTableScanDesc parallel_scan,
|
||||||
uint32 flags, Bitmapset *attr_needed,
|
uint32 flags, Bitmapset *attr_needed,
|
||||||
List *scanQual);
|
List *scanQual);
|
||||||
extern int64 ColumnarGetChunkGroupsFiltered(TableScanDesc scanDesc);
|
extern int64 ColumnarScanChunkGroupsFiltered(TableScanDesc scanDesc);
|
||||||
extern bool IsColumnarTableAmTable(Oid relationId);
|
extern bool IsColumnarTableAmTable(Oid relationId);
|
||||||
extern TableDDLCommand * ColumnarGetTableOptionsDDL(Oid relationId);
|
extern TableDDLCommand * ColumnarGetTableOptionsDDL(Oid relationId);
|
||||||
extern char * GetShardedTableDDLCommandColumnar(uint64 shardId, void *context);
|
extern char * GetShardedTableDDLCommandColumnar(uint64 shardId, void *context);
|
||||||
|
|
Loading…
Reference in New Issue