mirror of https://github.com/citusdata/citus.git
Add first_row_number column to columnar.stripe for tid mapping
parent
9c1ac3127f
commit
2e419ea177
|
@ -27,6 +27,7 @@
|
||||||
#include "columnar/columnar.h"
|
#include "columnar/columnar.h"
|
||||||
#include "columnar/columnar_storage.h"
|
#include "columnar/columnar_storage.h"
|
||||||
#include "columnar/columnar_version_compat.h"
|
#include "columnar/columnar_version_compat.h"
|
||||||
|
#include "distributed/listutils.h"
|
||||||
|
|
||||||
#include <sys/stat.h>
|
#include <sys/stat.h>
|
||||||
#include "access/heapam.h"
|
#include "access/heapam.h"
|
||||||
|
@ -83,6 +84,7 @@ static Oid ColumnarChunkIndexRelationId(void);
|
||||||
static Oid ColumnarChunkGroupIndexRelationId(void);
|
static Oid ColumnarChunkGroupIndexRelationId(void);
|
||||||
static Oid ColumnarNamespaceId(void);
|
static Oid ColumnarNamespaceId(void);
|
||||||
static uint64 LookupStorageId(RelFileNode relfilenode);
|
static uint64 LookupStorageId(RelFileNode relfilenode);
|
||||||
|
static uint64 GetHighestUsedFirstRowNumber(uint64 storageId);
|
||||||
static void DeleteStorageFromColumnarMetadataTable(Oid metadataTableId,
|
static void DeleteStorageFromColumnarMetadataTable(Oid metadataTableId,
|
||||||
AttrNumber storageIdAtrrNumber,
|
AttrNumber storageIdAtrrNumber,
|
||||||
Oid storageIdIndexId,
|
Oid storageIdIndexId,
|
||||||
|
@ -126,7 +128,7 @@ typedef FormData_columnar_options *Form_columnar_options;
|
||||||
|
|
||||||
|
|
||||||
/* constants for columnar.stripe */
|
/* constants for columnar.stripe */
|
||||||
#define Natts_columnar_stripe 8
|
#define Natts_columnar_stripe 9
|
||||||
#define Anum_columnar_stripe_storageid 1
|
#define Anum_columnar_stripe_storageid 1
|
||||||
#define Anum_columnar_stripe_stripe 2
|
#define Anum_columnar_stripe_stripe 2
|
||||||
#define Anum_columnar_stripe_file_offset 3
|
#define Anum_columnar_stripe_file_offset 3
|
||||||
|
@ -135,6 +137,7 @@ typedef FormData_columnar_options *Form_columnar_options;
|
||||||
#define Anum_columnar_stripe_chunk_row_count 6
|
#define Anum_columnar_stripe_chunk_row_count 6
|
||||||
#define Anum_columnar_stripe_row_count 7
|
#define Anum_columnar_stripe_row_count 7
|
||||||
#define Anum_columnar_stripe_chunk_count 8
|
#define Anum_columnar_stripe_chunk_count 8
|
||||||
|
#define Anum_columnar_stripe_first_row_number 9
|
||||||
|
|
||||||
/* constants for columnar.chunk_group */
|
/* constants for columnar.chunk_group */
|
||||||
#define Natts_columnar_chunkgroup 4
|
#define Natts_columnar_chunkgroup 4
|
||||||
|
@ -690,7 +693,8 @@ InsertStripeMetadataRow(uint64 storageId, StripeMetadata *stripe)
|
||||||
Int32GetDatum(stripe->columnCount),
|
Int32GetDatum(stripe->columnCount),
|
||||||
Int32GetDatum(stripe->chunkGroupRowCount),
|
Int32GetDatum(stripe->chunkGroupRowCount),
|
||||||
Int64GetDatum(stripe->rowCount),
|
Int64GetDatum(stripe->rowCount),
|
||||||
Int32GetDatum(stripe->chunkCount)
|
Int32GetDatum(stripe->chunkCount),
|
||||||
|
UInt64GetDatum(stripe->firstRowNumber)
|
||||||
};
|
};
|
||||||
|
|
||||||
Oid columnarStripesOid = ColumnarStripeRelationId();
|
Oid columnarStripesOid = ColumnarStripeRelationId();
|
||||||
|
@ -781,18 +785,14 @@ GetHighestUsedAddressAndId(uint64 storageId,
|
||||||
StripeMetadata
|
StripeMetadata
|
||||||
ReserveStripe(Relation rel, uint64 sizeBytes,
|
ReserveStripe(Relation rel, uint64 sizeBytes,
|
||||||
uint64 rowCount, uint64 columnCount,
|
uint64 rowCount, uint64 columnCount,
|
||||||
uint64 chunkCount, uint64 chunkGroupRowCount)
|
uint64 chunkCount, uint64 chunkGroupRowCount,
|
||||||
|
uint64 stripeFirstRowNumber)
|
||||||
{
|
{
|
||||||
StripeMetadata stripe = { 0 };
|
StripeMetadata stripe = { 0 };
|
||||||
|
|
||||||
uint64 storageId = ColumnarStorageGetStorageId(rel, false);
|
uint64 storageId = ColumnarStorageGetStorageId(rel, false);
|
||||||
|
|
||||||
/*
|
uint64 stripeId = ColumnarStorageReserveStripe(rel);
|
||||||
* TODO: For now, we don't use row number reservation at all, so just use
|
|
||||||
* dummy values.
|
|
||||||
*/
|
|
||||||
uint64 firstReservedRow;
|
|
||||||
uint64 stripeId = ColumnarStorageReserveStripe(rel, 0, &firstReservedRow);
|
|
||||||
uint64 resLogicalStart = ColumnarStorageReserveData(rel, sizeBytes);
|
uint64 resLogicalStart = ColumnarStorageReserveData(rel, sizeBytes);
|
||||||
|
|
||||||
stripe.fileOffset = resLogicalStart;
|
stripe.fileOffset = resLogicalStart;
|
||||||
|
@ -802,6 +802,7 @@ ReserveStripe(Relation rel, uint64 sizeBytes,
|
||||||
stripe.columnCount = columnCount;
|
stripe.columnCount = columnCount;
|
||||||
stripe.rowCount = rowCount;
|
stripe.rowCount = rowCount;
|
||||||
stripe.id = stripeId;
|
stripe.id = stripeId;
|
||||||
|
stripe.firstRowNumber = stripeFirstRowNumber;
|
||||||
|
|
||||||
InsertStripeMetadataRow(storageId, &stripe);
|
InsertStripeMetadataRow(storageId, &stripe);
|
||||||
|
|
||||||
|
@ -854,6 +855,8 @@ ReadDataFileStripeList(uint64 storageId, Snapshot snapshot)
|
||||||
datumArray[Anum_columnar_stripe_chunk_row_count - 1]);
|
datumArray[Anum_columnar_stripe_chunk_row_count - 1]);
|
||||||
stripeMetadata->rowCount = DatumGetInt64(
|
stripeMetadata->rowCount = DatumGetInt64(
|
||||||
datumArray[Anum_columnar_stripe_row_count - 1]);
|
datumArray[Anum_columnar_stripe_row_count - 1]);
|
||||||
|
stripeMetadata->firstRowNumber = DatumGetUInt64(
|
||||||
|
datumArray[Anum_columnar_stripe_first_row_number - 1]);
|
||||||
|
|
||||||
stripeMetadataList = lappend(stripeMetadataList, stripeMetadata);
|
stripeMetadataList = lappend(stripeMetadataList, stripeMetadata);
|
||||||
}
|
}
|
||||||
|
@ -1294,10 +1297,42 @@ ColumnarStorageUpdateIfNeeded(Relation rel, bool isUpgrade)
|
||||||
GetHighestUsedAddressAndId(storageId, &highestOffset, &highestId);
|
GetHighestUsedAddressAndId(storageId, &highestOffset, &highestId);
|
||||||
|
|
||||||
uint64 reservedStripeId = highestId + 1;
|
uint64 reservedStripeId = highestId + 1;
|
||||||
|
|
||||||
/* XXX: should be set properly */
|
|
||||||
uint64 reservedRowNumber = 0;
|
|
||||||
uint64 reservedOffset = highestOffset + 1;
|
uint64 reservedOffset = highestOffset + 1;
|
||||||
|
uint64 reservedRowNumber = GetHighestUsedFirstRowNumber(storageId) + 1;
|
||||||
ColumnarStorageUpdateCurrent(rel, isUpgrade, reservedStripeId,
|
ColumnarStorageUpdateCurrent(rel, isUpgrade, reservedStripeId,
|
||||||
reservedRowNumber, reservedOffset);
|
reservedRowNumber, reservedOffset);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* GetHighestUsedFirstRowNumber returns the highest used first_row_number
|
||||||
|
* for given storageId. Returns COLUMNAR_INVALID_ROW_NUMBER if storage with
|
||||||
|
* storageId has no stripes.
|
||||||
|
* Note that normally we would use ColumnarStorageGetReservedRowNumber
|
||||||
|
* to decide that. However, this function is designed to be used when
|
||||||
|
* building the metapage itself during upgrades.
|
||||||
|
*/
|
||||||
|
static uint64
|
||||||
|
GetHighestUsedFirstRowNumber(uint64 storageId)
|
||||||
|
{
|
||||||
|
List *stripeMetadataList = ReadDataFileStripeList(storageId,
|
||||||
|
GetTransactionSnapshot());
|
||||||
|
if (list_length(stripeMetadataList) == 0)
|
||||||
|
{
|
||||||
|
return COLUMNAR_INVALID_ROW_NUMBER;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* XXX: Better to have an invalid value for StripeMetadata.rowCount too */
|
||||||
|
uint64 stripeRowCount = -1;
|
||||||
|
uint64 highestFirstRowNumber = COLUMNAR_INVALID_ROW_NUMBER;
|
||||||
|
|
||||||
|
StripeMetadata *stripeMetadata = NULL;
|
||||||
|
foreach_ptr(stripeMetadata, stripeMetadataList)
|
||||||
|
{
|
||||||
|
highestFirstRowNumber = Max(highestFirstRowNumber,
|
||||||
|
stripeMetadata->firstRowNumber);
|
||||||
|
stripeRowCount = stripeMetadata->rowCount;
|
||||||
|
}
|
||||||
|
|
||||||
|
return highestFirstRowNumber + stripeRowCount - 1;
|
||||||
|
}
|
||||||
|
|
|
@ -195,11 +195,12 @@ CreateStripeReadMemoryContext()
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* ColumnarReadNextRow tries to read a row from the columnar table. On success, it sets
|
* ColumnarReadNextRow tries to read a row from the columnar table. On success, it sets
|
||||||
* column values and nulls, and returns true. If there are no more rows to read,
|
* column values, column nulls and rowNumber (if passed to be non-NULL), and returns true.
|
||||||
* the function returns false.
|
* If there are no more rows to read, the function returns false.
|
||||||
*/
|
*/
|
||||||
bool
|
bool
|
||||||
ColumnarReadNextRow(ColumnarReadState *readState, Datum *columnValues, bool *columnNulls)
|
ColumnarReadNextRow(ColumnarReadState *readState, Datum *columnValues, bool *columnNulls,
|
||||||
|
uint64 *rowNumber)
|
||||||
{
|
{
|
||||||
while (true)
|
while (true)
|
||||||
{
|
{
|
||||||
|
@ -227,6 +228,14 @@ ColumnarReadNextRow(ColumnarReadState *readState, Datum *columnValues, bool *col
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (rowNumber)
|
||||||
|
{
|
||||||
|
StripeMetadata *stripeMetadata = list_nth(readState->stripeList,
|
||||||
|
readState->currentStripe);
|
||||||
|
*rowNumber = stripeMetadata->firstRowNumber +
|
||||||
|
readState->stripeReadState->currentRow - 1;
|
||||||
|
}
|
||||||
|
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -85,8 +85,6 @@ typedef struct PhysicalAddr
|
||||||
#define COLUMNAR_EMPTY_BLOCKNO 1
|
#define COLUMNAR_EMPTY_BLOCKNO 1
|
||||||
#define COLUMNAR_INVALID_STRIPE_ID 0
|
#define COLUMNAR_INVALID_STRIPE_ID 0
|
||||||
#define COLUMNAR_FIRST_STRIPE_ID 1
|
#define COLUMNAR_FIRST_STRIPE_ID 1
|
||||||
#define COLUMNAR_INVALID_ROW_NUMBER 0
|
|
||||||
#define COLUMNAR_FIRST_ROW_NUMBER 1
|
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
@ -347,10 +345,37 @@ ColumnarStorageIsCurrent(Relation rel)
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* ColumnarStorageReserveStripe - reserve stripe ID and row numbers.
|
* ColumnarStorageReserveRowNumber returns reservedRowNumber and advances
|
||||||
|
* it for next row number reservation.
|
||||||
*/
|
*/
|
||||||
uint64
|
uint64
|
||||||
ColumnarStorageReserveStripe(Relation rel, uint64 nrows, uint64 *firstRowNumber)
|
ColumnarStorageReserveRowNumber(Relation rel, uint64 nrows)
|
||||||
|
{
|
||||||
|
LockRelationForExtension(rel, ExclusiveLock);
|
||||||
|
|
||||||
|
ColumnarMetapage metapage = ColumnarMetapageRead(rel, false);
|
||||||
|
|
||||||
|
uint64 firstRowNumber = metapage.reservedRowNumber;
|
||||||
|
metapage.reservedRowNumber += nrows;
|
||||||
|
|
||||||
|
ColumnarOverwriteMetapage(rel, metapage);
|
||||||
|
|
||||||
|
UnlockRelationForExtension(rel, ExclusiveLock);
|
||||||
|
|
||||||
|
return firstRowNumber;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* ColumnarStorageReserveStripe returns stripeId and advances it for next
|
||||||
|
* stripeId reservation.
|
||||||
|
* Note that this function doesn't handle row number reservation.
|
||||||
|
* This is because, unlike stripeId reservation, we immediately reserve
|
||||||
|
* row number during writes, not when flushing stripes to disk.
|
||||||
|
* See ColumnarStorageReserveRowNumber function.
|
||||||
|
*/
|
||||||
|
uint64
|
||||||
|
ColumnarStorageReserveStripe(Relation rel)
|
||||||
{
|
{
|
||||||
LockRelationForExtension(rel, ExclusiveLock);
|
LockRelationForExtension(rel, ExclusiveLock);
|
||||||
|
|
||||||
|
@ -359,9 +384,6 @@ ColumnarStorageReserveStripe(Relation rel, uint64 nrows, uint64 *firstRowNumber)
|
||||||
uint64 stripeId = metapage.reservedStripeId;
|
uint64 stripeId = metapage.reservedStripeId;
|
||||||
metapage.reservedStripeId++;
|
metapage.reservedStripeId++;
|
||||||
|
|
||||||
*firstRowNumber = metapage.reservedRowNumber;
|
|
||||||
metapage.reservedRowNumber += nrows;
|
|
||||||
|
|
||||||
ColumnarOverwriteMetapage(rel, metapage);
|
ColumnarOverwriteMetapage(rel, metapage);
|
||||||
|
|
||||||
UnlockRelationForExtension(rel, ExclusiveLock);
|
UnlockRelationForExtension(rel, ExclusiveLock);
|
||||||
|
|
|
@ -82,12 +82,6 @@ typedef struct ColumnarScanDescData
|
||||||
MemoryContext scanContext;
|
MemoryContext scanContext;
|
||||||
Bitmapset *attr_needed;
|
Bitmapset *attr_needed;
|
||||||
List *scanQual;
|
List *scanQual;
|
||||||
|
|
||||||
/*
|
|
||||||
* ANALYZE requires an item pointer for sorting. We keep track of row
|
|
||||||
* number so we can construct an item pointer based on that.
|
|
||||||
*/
|
|
||||||
uint64 rowNumber;
|
|
||||||
} ColumnarScanDescData;
|
} ColumnarScanDescData;
|
||||||
|
|
||||||
typedef struct ColumnarScanDescData *ColumnarScanDesc;
|
typedef struct ColumnarScanDescData *ColumnarScanDesc;
|
||||||
|
@ -116,6 +110,7 @@ static void TruncateColumnar(Relation rel, int elevel);
|
||||||
static HeapTuple ColumnarSlotCopyHeapTuple(TupleTableSlot *slot);
|
static HeapTuple ColumnarSlotCopyHeapTuple(TupleTableSlot *slot);
|
||||||
static void ColumnarCheckLogicalReplication(Relation rel);
|
static void ColumnarCheckLogicalReplication(Relation rel);
|
||||||
static Datum * detoast_values(TupleDesc tupleDesc, Datum *orig_values, bool *isnull);
|
static Datum * detoast_values(TupleDesc tupleDesc, Datum *orig_values, bool *isnull);
|
||||||
|
static ItemPointerData row_number_to_tid(uint64 rowNumber);
|
||||||
|
|
||||||
/* Custom tuple slot ops used for columnar. Initialized in columnar_tableam_init(). */
|
/* Custom tuple slot ops used for columnar. Initialized in columnar_tableam_init(). */
|
||||||
static TupleTableSlotOps TTSOpsColumnar;
|
static TupleTableSlotOps TTSOpsColumnar;
|
||||||
|
@ -265,8 +260,9 @@ columnar_getnextslot(TableScanDesc sscan, ScanDirection direction, TupleTableSlo
|
||||||
|
|
||||||
ExecClearTuple(slot);
|
ExecClearTuple(slot);
|
||||||
|
|
||||||
|
uint64 rowNumber;
|
||||||
bool nextRowFound = ColumnarReadNextRow(scan->cs_readState, slot->tts_values,
|
bool nextRowFound = ColumnarReadNextRow(scan->cs_readState, slot->tts_values,
|
||||||
slot->tts_isnull);
|
slot->tts_isnull, &rowNumber);
|
||||||
|
|
||||||
if (!nextRowFound)
|
if (!nextRowFound)
|
||||||
{
|
{
|
||||||
|
@ -275,23 +271,41 @@ columnar_getnextslot(TableScanDesc sscan, ScanDirection direction, TupleTableSlo
|
||||||
|
|
||||||
ExecStoreVirtualTuple(slot);
|
ExecStoreVirtualTuple(slot);
|
||||||
|
|
||||||
/*
|
slot->tts_tid = row_number_to_tid(rowNumber);
|
||||||
* Set slot's item pointer block & offset to non-zero. These are
|
|
||||||
* used just for sorting in acquire_sample_rows(), so rowNumber
|
|
||||||
* is good enough. See ColumnarSlotCopyHeapTuple for more info.
|
|
||||||
*
|
|
||||||
* offset is 16-bits, so use the first 15 bits for offset and
|
|
||||||
* rest as block number.
|
|
||||||
*/
|
|
||||||
ItemPointerSetBlockNumber(&(slot->tts_tid), scan->rowNumber / (32 * 1024) + 1);
|
|
||||||
ItemPointerSetOffsetNumber(&(slot->tts_tid), scan->rowNumber % (32 * 1024) + 1);
|
|
||||||
|
|
||||||
scan->rowNumber++;
|
|
||||||
|
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* row_number_to_tid maps given rowNumber to ItemPointerData.
|
||||||
|
*/
|
||||||
|
static ItemPointerData
|
||||||
|
row_number_to_tid(uint64 rowNumber)
|
||||||
|
{
|
||||||
|
if (rowNumber == COLUMNAR_INVALID_ROW_NUMBER)
|
||||||
|
{
|
||||||
|
/* not expected but be on the safe side */
|
||||||
|
ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR),
|
||||||
|
errmsg("unexpected row number for columnar table")));
|
||||||
|
}
|
||||||
|
else if (rowNumber > COLUMNAR_MAX_ROW_NUMBER)
|
||||||
|
{
|
||||||
|
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||||
|
errmsg("columnar tables can't have row numbers "
|
||||||
|
"greater than " UINT64_FORMAT,
|
||||||
|
(uint64) COLUMNAR_MAX_ROW_NUMBER),
|
||||||
|
errhint("Consider using VACUUM FULL for your table")));
|
||||||
|
}
|
||||||
|
|
||||||
|
ItemPointerData tid = { 0 };
|
||||||
|
ItemPointerSetBlockNumber(&tid, rowNumber / VALID_ITEMPOINTER_OFFSETS);
|
||||||
|
ItemPointerSetOffsetNumber(&tid, rowNumber % VALID_ITEMPOINTER_OFFSETS +
|
||||||
|
FirstOffsetNumber);
|
||||||
|
return tid;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
static Size
|
static Size
|
||||||
columnar_parallelscan_estimate(Relation rel)
|
columnar_parallelscan_estimate(Relation rel)
|
||||||
{
|
{
|
||||||
|
@ -412,7 +426,8 @@ columnar_tuple_insert(Relation relation, TupleTableSlot *slot, CommandId cid,
|
||||||
Datum *values = detoast_values(slot->tts_tupleDescriptor,
|
Datum *values = detoast_values(slot->tts_tupleDescriptor,
|
||||||
slot->tts_values, slot->tts_isnull);
|
slot->tts_values, slot->tts_isnull);
|
||||||
|
|
||||||
ColumnarWriteRow(writeState, values, slot->tts_isnull);
|
uint64 writtenRowNumber = ColumnarWriteRow(writeState, values, slot->tts_isnull);
|
||||||
|
slot->tts_tid = row_number_to_tid(writtenRowNumber);
|
||||||
|
|
||||||
MemoryContextSwitchTo(oldContext);
|
MemoryContextSwitchTo(oldContext);
|
||||||
MemoryContextReset(ColumnarWritePerTupleContext(writeState));
|
MemoryContextReset(ColumnarWritePerTupleContext(writeState));
|
||||||
|
@ -458,7 +473,10 @@ columnar_multi_insert(Relation relation, TupleTableSlot **slots, int ntuples,
|
||||||
Datum *values = detoast_values(tupleSlot->tts_tupleDescriptor,
|
Datum *values = detoast_values(tupleSlot->tts_tupleDescriptor,
|
||||||
tupleSlot->tts_values, tupleSlot->tts_isnull);
|
tupleSlot->tts_values, tupleSlot->tts_isnull);
|
||||||
|
|
||||||
ColumnarWriteRow(writeState, values, tupleSlot->tts_isnull);
|
uint64 writtenRowNumber = ColumnarWriteRow(writeState, values,
|
||||||
|
tupleSlot->tts_isnull);
|
||||||
|
tupleSlot->tts_tid = row_number_to_tid(writtenRowNumber);
|
||||||
|
|
||||||
MemoryContextReset(ColumnarWritePerTupleContext(writeState));
|
MemoryContextReset(ColumnarWritePerTupleContext(writeState));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -629,7 +647,8 @@ columnar_relation_copy_for_cluster(Relation OldHeap, Relation NewHeap,
|
||||||
|
|
||||||
*num_tuples = 0;
|
*num_tuples = 0;
|
||||||
|
|
||||||
while (ColumnarReadNextRow(readState, values, nulls))
|
/* we don't need to know rowNumber here */
|
||||||
|
while (ColumnarReadNextRow(readState, values, nulls, NULL))
|
||||||
{
|
{
|
||||||
ColumnarWriteRow(writeState, values, nulls);
|
ColumnarWriteRow(writeState, values, nulls);
|
||||||
(*num_tuples)++;
|
(*num_tuples)++;
|
||||||
|
@ -1180,13 +1199,7 @@ ColumnarSlotCopyHeapTuple(TupleTableSlot *slot)
|
||||||
slot->tts_values,
|
slot->tts_values,
|
||||||
slot->tts_isnull);
|
slot->tts_isnull);
|
||||||
|
|
||||||
/*
|
/* slot->tts_tid is filled in columnar_getnextslot */
|
||||||
* We need to set item pointer, since implementation of ANALYZE
|
|
||||||
* requires it. See the qsort in acquire_sample_rows() and
|
|
||||||
* also compare_rows in backend/commands/analyze.c.
|
|
||||||
*
|
|
||||||
* slot->tts_tid is filled in columnar_getnextslot.
|
|
||||||
*/
|
|
||||||
tuple->t_self = slot->tts_tid;
|
tuple->t_self = slot->tts_tid;
|
||||||
|
|
||||||
return tuple;
|
return tuple;
|
||||||
|
|
|
@ -43,6 +43,7 @@ struct ColumnarWriteState
|
||||||
MemoryContext perTupleContext;
|
MemoryContext perTupleContext;
|
||||||
StripeBuffers *stripeBuffers;
|
StripeBuffers *stripeBuffers;
|
||||||
StripeSkipList *stripeSkipList;
|
StripeSkipList *stripeSkipList;
|
||||||
|
uint64 stripeFirstRowNumber;
|
||||||
ColumnarOptions options;
|
ColumnarOptions options;
|
||||||
ChunkData *chunkData;
|
ChunkData *chunkData;
|
||||||
|
|
||||||
|
@ -129,6 +130,7 @@ ColumnarBeginWrite(RelFileNode relfilenode,
|
||||||
writeState->comparisonFunctionArray = comparisonFunctionArray;
|
writeState->comparisonFunctionArray = comparisonFunctionArray;
|
||||||
writeState->stripeBuffers = NULL;
|
writeState->stripeBuffers = NULL;
|
||||||
writeState->stripeSkipList = NULL;
|
writeState->stripeSkipList = NULL;
|
||||||
|
writeState->stripeFirstRowNumber = COLUMNAR_INVALID_ROW_NUMBER;
|
||||||
writeState->stripeWriteContext = stripeWriteContext;
|
writeState->stripeWriteContext = stripeWriteContext;
|
||||||
writeState->chunkData = chunkData;
|
writeState->chunkData = chunkData;
|
||||||
writeState->compressionBuffer = NULL;
|
writeState->compressionBuffer = NULL;
|
||||||
|
@ -147,8 +149,10 @@ ColumnarBeginWrite(RelFileNode relfilenode,
|
||||||
* corresponding skip nodes. Then, whole chunk data is compressed at every
|
* corresponding skip nodes. Then, whole chunk data is compressed at every
|
||||||
* rowChunkCount insertion. Then, if row count exceeds stripeMaxRowCount, we flush
|
* rowChunkCount insertion. Then, if row count exceeds stripeMaxRowCount, we flush
|
||||||
* the stripe, and add its metadata to the table footer.
|
* the stripe, and add its metadata to the table footer.
|
||||||
|
*
|
||||||
|
* Returns the "row number" assigned to written row.
|
||||||
*/
|
*/
|
||||||
void
|
uint64
|
||||||
ColumnarWriteRow(ColumnarWriteState *writeState, Datum *columnValues, bool *columnNulls)
|
ColumnarWriteRow(ColumnarWriteState *writeState, Datum *columnValues, bool *columnNulls)
|
||||||
{
|
{
|
||||||
uint32 columnIndex = 0;
|
uint32 columnIndex = 0;
|
||||||
|
@ -170,6 +174,14 @@ ColumnarWriteRow(ColumnarWriteState *writeState, Datum *columnValues, bool *colu
|
||||||
writeState->stripeSkipList = stripeSkipList;
|
writeState->stripeSkipList = stripeSkipList;
|
||||||
writeState->compressionBuffer = makeStringInfo();
|
writeState->compressionBuffer = makeStringInfo();
|
||||||
|
|
||||||
|
Oid relationId = RelidByRelfilenode(writeState->relfilenode.spcNode,
|
||||||
|
writeState->relfilenode.relNode);
|
||||||
|
Relation relation = relation_open(relationId, NoLock);
|
||||||
|
writeState->stripeFirstRowNumber =
|
||||||
|
ColumnarStorageReserveRowNumber(relation,
|
||||||
|
options->stripeRowCount);
|
||||||
|
relation_close(relation, NoLock);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* serializedValueBuffer lives in stripe write memory context so it needs to be
|
* serializedValueBuffer lives in stripe write memory context so it needs to be
|
||||||
* initialized when the stripe is created.
|
* initialized when the stripe is created.
|
||||||
|
@ -226,6 +238,7 @@ ColumnarWriteRow(ColumnarWriteState *writeState, Datum *columnValues, bool *colu
|
||||||
SerializeChunkData(writeState, chunkIndex, chunkRowCount);
|
SerializeChunkData(writeState, chunkIndex, chunkRowCount);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
uint64 writtenRowNumber = writeState->stripeFirstRowNumber + stripeBuffers->rowCount;
|
||||||
stripeBuffers->rowCount++;
|
stripeBuffers->rowCount++;
|
||||||
if (stripeBuffers->rowCount >= options->stripeRowCount)
|
if (stripeBuffers->rowCount >= options->stripeRowCount)
|
||||||
{
|
{
|
||||||
|
@ -233,6 +246,8 @@ ColumnarWriteRow(ColumnarWriteState *writeState, Datum *columnValues, bool *colu
|
||||||
}
|
}
|
||||||
|
|
||||||
MemoryContextSwitchTo(oldContext);
|
MemoryContextSwitchTo(oldContext);
|
||||||
|
|
||||||
|
return writtenRowNumber;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -429,7 +444,7 @@ FlushStripe(ColumnarWriteState *writeState)
|
||||||
|
|
||||||
stripeMetadata = ReserveStripe(relation, stripeSize,
|
stripeMetadata = ReserveStripe(relation, stripeSize,
|
||||||
stripeRowCount, columnCount, chunkCount,
|
stripeRowCount, columnCount, chunkCount,
|
||||||
chunkRowCount);
|
chunkRowCount, writeState->stripeFirstRowNumber);
|
||||||
|
|
||||||
uint64 currentFileOffset = stripeMetadata.fileOffset;
|
uint64 currentFileOffset = stripeMetadata.fileOffset;
|
||||||
|
|
||||||
|
|
|
@ -21,6 +21,30 @@ END$proc$;
|
||||||
-- columnar objects when upgrading postgres
|
-- columnar objects when upgrading postgres
|
||||||
DROP FUNCTION citus_internal.columnar_ensure_objects_exist();
|
DROP FUNCTION citus_internal.columnar_ensure_objects_exist();
|
||||||
|
|
||||||
|
-- For a proper mapping between tid & (stripe, row_num), add a new column to
|
||||||
|
-- columnar.stripe and define a BTREE index on this column.
|
||||||
|
-- Also include storage_id column for per-relation scans.
|
||||||
|
ALTER TABLE columnar.stripe ADD COLUMN first_row_number bigint;
|
||||||
|
CREATE INDEX stripe_first_row_number_idx ON columnar.stripe USING BTREE(storage_id, first_row_number);
|
||||||
|
|
||||||
|
-- Populate first_row_number column of columnar.stripe table.
|
||||||
|
--
|
||||||
|
-- For simplicity, we calculate MAX(row_count) value across all the stripes
|
||||||
|
-- of all the columanar tables and then use it to populate first_row_number
|
||||||
|
-- column. This would introduce some gaps however we are okay with that since
|
||||||
|
-- it's already the case with regular INSERT/COPY's.
|
||||||
|
DO $$
|
||||||
|
DECLARE
|
||||||
|
max_row_count bigint;
|
||||||
|
-- this should be equal to columnar_storage.h/COLUMNAR_FIRST_ROW_NUMBER
|
||||||
|
COLUMNAR_FIRST_ROW_NUMBER constant bigint := 1;
|
||||||
|
BEGIN
|
||||||
|
SELECT MAX(row_count) INTO max_row_count FROM columnar.stripe;
|
||||||
|
UPDATE columnar.stripe SET first_row_number = COLUMNAR_FIRST_ROW_NUMBER +
|
||||||
|
(stripe_num - 1) * max_row_count;
|
||||||
|
END;
|
||||||
|
$$;
|
||||||
|
|
||||||
#include "udfs/upgrade_columnar_storage/10.1-1.sql"
|
#include "udfs/upgrade_columnar_storage/10.1-1.sql"
|
||||||
#include "udfs/downgrade_columnar_storage/10.1-1.sql"
|
#include "udfs/downgrade_columnar_storage/10.1-1.sql"
|
||||||
|
|
||||||
|
|
|
@ -18,3 +18,7 @@ SELECT citus_internal.downgrade_columnar_storage(c.oid) FROM pg_class c, pg_am a
|
||||||
|
|
||||||
DROP FUNCTION citus_internal.upgrade_columnar_storage(regclass);
|
DROP FUNCTION citus_internal.upgrade_columnar_storage(regclass);
|
||||||
DROP FUNCTION citus_internal.downgrade_columnar_storage(regclass);
|
DROP FUNCTION citus_internal.downgrade_columnar_storage(regclass);
|
||||||
|
|
||||||
|
-- drop "first_row_number" column and the index defined on it
|
||||||
|
DROP INDEX columnar.stripe_first_row_number_idx;
|
||||||
|
ALTER TABLE columnar.stripe DROP COLUMN first_row_number;
|
||||||
|
|
|
@ -201,8 +201,8 @@ extern CompressionType ParseCompressionType(const char *compressionTypeString);
|
||||||
extern ColumnarWriteState * ColumnarBeginWrite(RelFileNode relfilenode,
|
extern ColumnarWriteState * ColumnarBeginWrite(RelFileNode relfilenode,
|
||||||
ColumnarOptions options,
|
ColumnarOptions options,
|
||||||
TupleDesc tupleDescriptor);
|
TupleDesc tupleDescriptor);
|
||||||
extern void ColumnarWriteRow(ColumnarWriteState *state, Datum *columnValues,
|
extern uint64 ColumnarWriteRow(ColumnarWriteState *state, Datum *columnValues,
|
||||||
bool *columnNulls);
|
bool *columnNulls);
|
||||||
extern void ColumnarFlushPendingWrites(ColumnarWriteState *state);
|
extern void ColumnarFlushPendingWrites(ColumnarWriteState *state);
|
||||||
extern void ColumnarEndWrite(ColumnarWriteState *state);
|
extern void ColumnarEndWrite(ColumnarWriteState *state);
|
||||||
extern bool ContainsPendingWrites(ColumnarWriteState *state);
|
extern bool ContainsPendingWrites(ColumnarWriteState *state);
|
||||||
|
@ -214,7 +214,7 @@ extern ColumnarReadState * ColumnarBeginRead(Relation relation,
|
||||||
List *projectedColumnList,
|
List *projectedColumnList,
|
||||||
List *qualConditions);
|
List *qualConditions);
|
||||||
extern bool ColumnarReadNextRow(ColumnarReadState *state, Datum *columnValues,
|
extern bool ColumnarReadNextRow(ColumnarReadState *state, Datum *columnValues,
|
||||||
bool *columnNulls);
|
bool *columnNulls, uint64 *rowNumber);
|
||||||
extern void ColumnarRescan(ColumnarReadState *readState);
|
extern void ColumnarRescan(ColumnarReadState *readState);
|
||||||
extern void ColumnarEndRead(ColumnarReadState *state);
|
extern void ColumnarEndRead(ColumnarReadState *state);
|
||||||
extern int64 ColumnarReadChunkGroupsFiltered(ColumnarReadState *state);
|
extern int64 ColumnarReadChunkGroupsFiltered(ColumnarReadState *state);
|
||||||
|
@ -241,7 +241,8 @@ extern uint64 ColumnarMetadataNewStorageId(void);
|
||||||
extern uint64 GetHighestUsedAddress(RelFileNode relfilenode);
|
extern uint64 GetHighestUsedAddress(RelFileNode relfilenode);
|
||||||
extern StripeMetadata ReserveStripe(Relation rel, uint64 size,
|
extern StripeMetadata ReserveStripe(Relation rel, uint64 size,
|
||||||
uint64 rowCount, uint64 columnCount,
|
uint64 rowCount, uint64 columnCount,
|
||||||
uint64 chunkCount, uint64 chunkGroupRowCount);
|
uint64 chunkCount, uint64 chunkGroupRowCount,
|
||||||
|
uint64 stripeFirstRowNumber);
|
||||||
extern void SaveStripeSkipList(RelFileNode relfilenode, uint64 stripe,
|
extern void SaveStripeSkipList(RelFileNode relfilenode, uint64 stripe,
|
||||||
StripeSkipList *stripeSkipList,
|
StripeSkipList *stripeSkipList,
|
||||||
TupleDesc tupleDescriptor);
|
TupleDesc tupleDescriptor);
|
||||||
|
|
|
@ -25,6 +25,7 @@ typedef struct StripeMetadata
|
||||||
uint32 chunkGroupRowCount;
|
uint32 chunkGroupRowCount;
|
||||||
uint64 rowCount;
|
uint64 rowCount;
|
||||||
uint64 id;
|
uint64 id;
|
||||||
|
uint64 firstRowNumber;
|
||||||
} StripeMetadata;
|
} StripeMetadata;
|
||||||
|
|
||||||
extern List * StripesForRelfilenode(RelFileNode relfilenode);
|
extern List * StripesForRelfilenode(RelFileNode relfilenode);
|
||||||
|
|
|
@ -17,6 +17,16 @@
|
||||||
#include "storage/smgr.h"
|
#include "storage/smgr.h"
|
||||||
#include "utils/rel.h"
|
#include "utils/rel.h"
|
||||||
|
|
||||||
|
#include "columnar/columnar_tableam.h"
|
||||||
|
|
||||||
|
|
||||||
|
#define COLUMNAR_INVALID_ROW_NUMBER ((uint64) 0)
|
||||||
|
#define COLUMNAR_FIRST_ROW_NUMBER ((uint64) 1)
|
||||||
|
#define COLUMNAR_MAX_ROW_NUMBER ((uint64) \
|
||||||
|
(COLUMNAR_FIRST_ROW_NUMBER + \
|
||||||
|
(uint64) VALID_ITEMPOINTER_OFFSETS * \
|
||||||
|
(uint64) VALID_BLOCKNUMBERS))
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Logical offsets never fall on the first two physical pages. See
|
* Logical offsets never fall on the first two physical pages. See
|
||||||
|
@ -42,8 +52,8 @@ extern uint64 ColumnarStorageGetReservedRowNumber(Relation rel, bool force);
|
||||||
extern uint64 ColumnarStorageGetReservedOffset(Relation rel, bool force);
|
extern uint64 ColumnarStorageGetReservedOffset(Relation rel, bool force);
|
||||||
|
|
||||||
extern uint64 ColumnarStorageReserveData(Relation rel, uint64 amount);
|
extern uint64 ColumnarStorageReserveData(Relation rel, uint64 amount);
|
||||||
extern uint64 ColumnarStorageReserveStripe(Relation rel, uint64 nrows,
|
extern uint64 ColumnarStorageReserveRowNumber(Relation rel, uint64 nrows);
|
||||||
uint64 *firstRowNumber);
|
extern uint64 ColumnarStorageReserveStripe(Relation rel);
|
||||||
|
|
||||||
extern void ColumnarStorageRead(Relation rel, uint64 logicalOffset,
|
extern void ColumnarStorageRead(Relation rel, uint64 logicalOffset,
|
||||||
char *data, uint32 amount);
|
char *data, uint32 amount);
|
||||||
|
|
|
@ -8,6 +8,44 @@
|
||||||
|
|
||||||
#include "distributed/coordinator_protocol.h"
|
#include "distributed/coordinator_protocol.h"
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Number of valid ItemPointer Offset's for "row number" <> "ItemPointer"
|
||||||
|
* mapping.
|
||||||
|
*
|
||||||
|
* Postgres has some asserts calling either ItemPointerIsValid or
|
||||||
|
* OffsetNumberIsValid. That constraints itemPointer.offsetNumber
|
||||||
|
* for columnar tables to the following interval:
|
||||||
|
* [FirstOffsetNumber, MaxOffsetNumber].
|
||||||
|
*
|
||||||
|
* However, for GIN indexes, Postgres also asserts the following in
|
||||||
|
* itemptr_to_uint64 function:
|
||||||
|
* "GinItemPointerGetOffsetNumber(iptr) < (1 << MaxHeapTuplesPerPageBits)",
|
||||||
|
* where MaxHeapTuplesPerPageBits = 11.
|
||||||
|
* That means, offsetNumber for columnar tables can't be equal to
|
||||||
|
* 2**11 = 2048 = MaxOffsetNumber.
|
||||||
|
* Hence we can't use MaxOffsetNumber as offsetNumber too.
|
||||||
|
*
|
||||||
|
* For this reason, we restrict itemPointer.offsetNumber
|
||||||
|
* to the following interval: [FirstOffsetNumber, MaxOffsetNumber).
|
||||||
|
*/
|
||||||
|
#define VALID_ITEMPOINTER_OFFSETS (MaxOffsetNumber - FirstOffsetNumber)
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Number of valid ItemPointer BlockNumber's for "row number" <> "ItemPointer"
|
||||||
|
* mapping.
|
||||||
|
*
|
||||||
|
* Similar to VALID_ITEMPOINTER_OFFSETS, due to asserts around
|
||||||
|
* itemPointer.blockNumber, we can only use values upto and including
|
||||||
|
* MaxBlockNumber.
|
||||||
|
* Note that postgres doesn't restrict blockNumber to a lower boundary.
|
||||||
|
*
|
||||||
|
* For this reason, we restrict itemPointer.blockNumber
|
||||||
|
* to the following interval: [0, MaxBlockNumber].
|
||||||
|
*/
|
||||||
|
#define VALID_BLOCKNUMBERS (MaxBlockNumber + 1)
|
||||||
|
|
||||||
|
|
||||||
const TableAmRoutine * GetColumnarTableAmRoutine(void);
|
const TableAmRoutine * GetColumnarTableAmRoutine(void);
|
||||||
extern void columnar_tableam_init(void);
|
extern void columnar_tableam_init(void);
|
||||||
extern void columnar_tableam_finish(void);
|
extern void columnar_tableam_finish(void);
|
||||||
|
|
|
@ -4,7 +4,7 @@ test: multi_test_catalog_views
|
||||||
|
|
||||||
test: columnar_create
|
test: columnar_create
|
||||||
test: columnar_load
|
test: columnar_load
|
||||||
test: columnar_query
|
test: columnar_query columnar_first_row_number
|
||||||
test: columnar_analyze
|
test: columnar_analyze
|
||||||
test: columnar_data_types
|
test: columnar_data_types
|
||||||
test: columnar_drop
|
test: columnar_drop
|
||||||
|
|
|
@ -17,7 +17,7 @@ select
|
||||||
from columnar_test_helpers.columnar_storage_info('test_alter_table');
|
from columnar_test_helpers.columnar_storage_info('test_alter_table');
|
||||||
version_major | version_minor | reserved_stripe_id | reserved_row_number | reserved_offset
|
version_major | version_minor | reserved_stripe_id | reserved_row_number | reserved_offset
|
||||||
---------------------------------------------------------------------
|
---------------------------------------------------------------------
|
||||||
2 | 0 | 2 | 1 | 16402
|
2 | 0 | 2 | 150001 | 16402
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
-- test analyze
|
-- test analyze
|
||||||
|
@ -72,7 +72,7 @@ select
|
||||||
from columnar_test_helpers.columnar_storage_info('test_alter_table');
|
from columnar_test_helpers.columnar_storage_info('test_alter_table');
|
||||||
version_major | version_minor | reserved_stripe_id | reserved_row_number | reserved_offset
|
version_major | version_minor | reserved_stripe_id | reserved_row_number | reserved_offset
|
||||||
---------------------------------------------------------------------
|
---------------------------------------------------------------------
|
||||||
2 | 0 | 4 | 1 | 32724
|
2 | 0 | 4 | 450001 | 32724
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
-- add a fixed-length column with default value
|
-- add a fixed-length column with default value
|
||||||
|
@ -104,7 +104,7 @@ select
|
||||||
from columnar_test_helpers.columnar_storage_info('test_alter_table');
|
from columnar_test_helpers.columnar_storage_info('test_alter_table');
|
||||||
version_major | version_minor | reserved_stripe_id | reserved_row_number | reserved_offset
|
version_major | version_minor | reserved_stripe_id | reserved_row_number | reserved_offset
|
||||||
---------------------------------------------------------------------
|
---------------------------------------------------------------------
|
||||||
2 | 0 | 5 | 1 | 40906
|
2 | 0 | 5 | 600001 | 40906
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
-- add a variable-length column with default value
|
-- add a variable-length column with default value
|
||||||
|
|
|
@ -0,0 +1,56 @@
|
||||||
|
CREATE SCHEMA columnar_first_row_number;
|
||||||
|
SET search_path tO columnar_first_row_number;
|
||||||
|
CREATE TABLE col_table_1 (a int) USING columnar;
|
||||||
|
INSERT INTO col_table_1 SELECT i FROM generate_series(1, 10) i;
|
||||||
|
BEGIN;
|
||||||
|
-- we don't use same first_row_number even if the xact is rollback'ed
|
||||||
|
INSERT INTO col_table_1 SELECT i FROM generate_series(1, 11) i;
|
||||||
|
ROLLBACK;
|
||||||
|
INSERT INTO col_table_1 SELECT i FROM generate_series(1, 12) i;
|
||||||
|
SELECT alter_columnar_table_set('col_table_1', stripe_row_limit => 100);
|
||||||
|
alter_columnar_table_set
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
INSERT INTO col_table_1 SELECT i FROM generate_series(1, 235) i;
|
||||||
|
SELECT row_count, first_row_number FROM columnar.stripe a
|
||||||
|
WHERE a.storage_id = columnar_test_helpers.columnar_relation_storageid('col_table_1'::regclass)
|
||||||
|
ORDER BY stripe_num;
|
||||||
|
row_count | first_row_number
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
10 | 1
|
||||||
|
12 | 300001
|
||||||
|
100 | 450001
|
||||||
|
100 | 450101
|
||||||
|
35 | 450201
|
||||||
|
(5 rows)
|
||||||
|
|
||||||
|
VACUUM FULL col_table_1;
|
||||||
|
-- show that we properly update first_row_number after VACUUM FULL
|
||||||
|
SELECT row_count, first_row_number FROM columnar.stripe a
|
||||||
|
WHERE a.storage_id = columnar_test_helpers.columnar_relation_storageid('col_table_1'::regclass)
|
||||||
|
ORDER BY stripe_num;
|
||||||
|
row_count | first_row_number
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
100 | 1
|
||||||
|
100 | 101
|
||||||
|
57 | 201
|
||||||
|
(3 rows)
|
||||||
|
|
||||||
|
TRUNCATE col_table_1;
|
||||||
|
BEGIN;
|
||||||
|
INSERT INTO col_table_1 SELECT i FROM generate_series(1, 16) i;
|
||||||
|
INSERT INTO col_table_1 SELECT i FROM generate_series(1, 16) i;
|
||||||
|
COMMIT;
|
||||||
|
-- show that we start with first_row_number=1 after TRUNCATE
|
||||||
|
SELECT row_count, first_row_number FROM columnar.stripe a
|
||||||
|
WHERE a.storage_id = columnar_test_helpers.columnar_relation_storageid('col_table_1'::regclass)
|
||||||
|
ORDER BY stripe_num;
|
||||||
|
row_count | first_row_number
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
32 | 1
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SET client_min_messages TO ERROR;
|
||||||
|
DROP SCHEMA columnar_first_row_number CASCADE;
|
|
@ -50,7 +50,7 @@ select
|
||||||
from columnar_test_helpers.columnar_storage_info('test_insert_command');
|
from columnar_test_helpers.columnar_storage_info('test_insert_command');
|
||||||
version_major | version_minor | reserved_stripe_id | reserved_row_number | reserved_offset
|
version_major | version_minor | reserved_stripe_id | reserved_row_number | reserved_offset
|
||||||
---------------------------------------------------------------------
|
---------------------------------------------------------------------
|
||||||
2 | 0 | 4 | 1 | 32686
|
2 | 0 | 4 | 450001 | 32686
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
SELECT * FROM columnar_test_helpers.chunk_group_consistency;
|
SELECT * FROM columnar_test_helpers.chunk_group_consistency;
|
||||||
|
@ -154,7 +154,7 @@ select
|
||||||
from columnar_test_helpers.columnar_storage_info('test_toast_columnar');
|
from columnar_test_helpers.columnar_storage_info('test_toast_columnar');
|
||||||
version_major | version_minor | reserved_stripe_id | reserved_row_number | reserved_offset
|
version_major | version_minor | reserved_stripe_id | reserved_row_number | reserved_offset
|
||||||
---------------------------------------------------------------------
|
---------------------------------------------------------------------
|
||||||
2 | 0 | 2 | 1 | 16428
|
2 | 0 | 2 | 150001 | 16428
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
SELECT * FROM columnar_test_helpers.chunk_group_consistency;
|
SELECT * FROM columnar_test_helpers.chunk_group_consistency;
|
||||||
|
@ -194,7 +194,7 @@ select
|
||||||
from columnar_test_helpers.columnar_storage_info('zero_col');
|
from columnar_test_helpers.columnar_storage_info('zero_col');
|
||||||
version_major | version_minor | reserved_stripe_id | reserved_row_number | reserved_offset
|
version_major | version_minor | reserved_stripe_id | reserved_row_number | reserved_offset
|
||||||
---------------------------------------------------------------------
|
---------------------------------------------------------------------
|
||||||
2 | 0 | 6 | 1 | 16336
|
2 | 0 | 6 | 750001 | 16336
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
SELECT relname, stripe_num, chunk_group_count, row_count FROM columnar.stripe a, pg_class b
|
SELECT relname, stripe_num, chunk_group_count, row_count FROM columnar.stripe a, pg_class b
|
||||||
|
|
|
@ -19,7 +19,7 @@ select
|
||||||
from columnar_test_helpers.columnar_storage_info('t');
|
from columnar_test_helpers.columnar_storage_info('t');
|
||||||
version_major | version_minor | reserved_stripe_id | reserved_row_number | reserved_offset
|
version_major | version_minor | reserved_stripe_id | reserved_row_number | reserved_offset
|
||||||
---------------------------------------------------------------------
|
---------------------------------------------------------------------
|
||||||
2 | 0 | 1 | 1 | 16336
|
2 | 0 | 1 | 150001 | 16336
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
-- check stripe metadata also have been rolled-back
|
-- check stripe metadata also have been rolled-back
|
||||||
|
@ -59,7 +59,7 @@ select
|
||||||
from columnar_test_helpers.columnar_storage_info('t');
|
from columnar_test_helpers.columnar_storage_info('t');
|
||||||
version_major | version_minor | reserved_stripe_id | reserved_row_number | reserved_offset
|
version_major | version_minor | reserved_stripe_id | reserved_row_number | reserved_offset
|
||||||
---------------------------------------------------------------------
|
---------------------------------------------------------------------
|
||||||
2 | 0 | 3 | 1 | 24606
|
2 | 0 | 3 | 600001 | 24606
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
SELECT count(*) FROM t;
|
SELECT count(*) FROM t;
|
||||||
|
@ -89,7 +89,7 @@ select
|
||||||
from columnar_test_helpers.columnar_storage_info('t');
|
from columnar_test_helpers.columnar_storage_info('t');
|
||||||
version_major | version_minor | reserved_stripe_id | reserved_row_number | reserved_offset
|
version_major | version_minor | reserved_stripe_id | reserved_row_number | reserved_offset
|
||||||
---------------------------------------------------------------------
|
---------------------------------------------------------------------
|
||||||
2 | 0 | 5 | 1 | 40942
|
2 | 0 | 5 | 750001 | 40942
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
SELECT count(*) FROM t;
|
SELECT count(*) FROM t;
|
||||||
|
|
|
@ -48,7 +48,7 @@ select
|
||||||
from columnar_test_helpers.columnar_storage_info('columnar_truncate_test');
|
from columnar_test_helpers.columnar_storage_info('columnar_truncate_test');
|
||||||
version_major | version_minor | reserved_stripe_id | reserved_row_number | reserved_offset
|
version_major | version_minor | reserved_stripe_id | reserved_row_number | reserved_offset
|
||||||
---------------------------------------------------------------------
|
---------------------------------------------------------------------
|
||||||
2 | 0 | 2 | 1 | 16438
|
2 | 0 | 2 | 150001 | 16438
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
TRUNCATE TABLE columnar_truncate_test;
|
TRUNCATE TABLE columnar_truncate_test;
|
||||||
|
|
|
@ -30,7 +30,7 @@ select
|
||||||
from columnar_test_helpers.columnar_storage_info('t');
|
from columnar_test_helpers.columnar_storage_info('t');
|
||||||
version_major | version_minor | reserved_stripe_id | reserved_row_number | reserved_offset
|
version_major | version_minor | reserved_stripe_id | reserved_row_number | reserved_offset
|
||||||
---------------------------------------------------------------------
|
---------------------------------------------------------------------
|
||||||
2 | 0 | 4 | 1 | 32756
|
2 | 0 | 4 | 450001 | 32756
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
-- vacuum full should merge stripes together
|
-- vacuum full should merge stripes together
|
||||||
|
@ -58,7 +58,7 @@ select
|
||||||
from columnar_test_helpers.columnar_storage_info('t');
|
from columnar_test_helpers.columnar_storage_info('t');
|
||||||
version_major | version_minor | reserved_stripe_id | reserved_row_number | reserved_offset
|
version_major | version_minor | reserved_stripe_id | reserved_row_number | reserved_offset
|
||||||
---------------------------------------------------------------------
|
---------------------------------------------------------------------
|
||||||
2 | 0 | 2 | 1 | 16584
|
2 | 0 | 2 | 150001 | 16584
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
-- test the case when all data cannot fit into a single stripe
|
-- test the case when all data cannot fit into a single stripe
|
||||||
|
@ -87,7 +87,7 @@ select
|
||||||
from columnar_test_helpers.columnar_storage_info('t');
|
from columnar_test_helpers.columnar_storage_info('t');
|
||||||
version_major | version_minor | reserved_stripe_id | reserved_row_number | reserved_offset
|
version_major | version_minor | reserved_stripe_id | reserved_row_number | reserved_offset
|
||||||
---------------------------------------------------------------------
|
---------------------------------------------------------------------
|
||||||
2 | 0 | 4 | 1 | 53382
|
2 | 0 | 4 | 3001 | 53382
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
SELECT * FROM columnar_test_helpers.chunk_group_consistency;
|
SELECT * FROM columnar_test_helpers.chunk_group_consistency;
|
||||||
|
@ -244,7 +244,7 @@ select
|
||||||
from columnar_test_helpers.columnar_storage_info('t');
|
from columnar_test_helpers.columnar_storage_info('t');
|
||||||
version_major | version_minor | reserved_stripe_id | reserved_row_number | reserved_offset
|
version_major | version_minor | reserved_stripe_id | reserved_row_number | reserved_offset
|
||||||
---------------------------------------------------------------------
|
---------------------------------------------------------------------
|
||||||
2 | 0 | 16 | 1 | 50686
|
2 | 0 | 16 | 21001 | 50686
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
SELECT * FROM columnar_test_helpers.chunk_group_consistency;
|
SELECT * FROM columnar_test_helpers.chunk_group_consistency;
|
||||||
|
|
|
@ -140,3 +140,45 @@ a b
|
||||||
11
|
11
|
||||||
12
|
12
|
||||||
13
|
13
|
||||||
|
|
||||||
|
starting permutation: s1-truncate s1-begin s1-insert-10000-rows s2-begin s2-insert s2-commit s1-commit s1-verify-metadata
|
||||||
|
step s1-truncate:
|
||||||
|
TRUNCATE test_insert_concurrency;
|
||||||
|
|
||||||
|
step s1-begin:
|
||||||
|
BEGIN;
|
||||||
|
|
||||||
|
step s1-insert-10000-rows:
|
||||||
|
INSERT INTO test_insert_concurrency SELECT i, 2 * i FROM generate_series(1, 10000) i;
|
||||||
|
|
||||||
|
step s2-begin:
|
||||||
|
BEGIN;
|
||||||
|
|
||||||
|
step s2-insert:
|
||||||
|
INSERT INTO test_insert_concurrency SELECT i, 2 * i FROM generate_series(4, 6) i;
|
||||||
|
|
||||||
|
step s2-commit:
|
||||||
|
COMMIT;
|
||||||
|
|
||||||
|
step s1-commit:
|
||||||
|
COMMIT;
|
||||||
|
|
||||||
|
step s1-verify-metadata:
|
||||||
|
WITH test_insert_concurrency_stripes AS (
|
||||||
|
SELECT first_row_number, stripe_num, row_count
|
||||||
|
FROM columnar.stripe a, pg_class b
|
||||||
|
WHERE columnar_relation_storageid(b.oid)=a.storage_id AND
|
||||||
|
relname = 'test_insert_concurrency'
|
||||||
|
)
|
||||||
|
SELECT
|
||||||
|
-- verify that table has two stripes ..
|
||||||
|
count(*) = 2 AND
|
||||||
|
-- .. and those stripes look like:
|
||||||
|
sum(case when stripe_num = 1 AND first_row_number = 150001 AND row_count = 3 then 1 end) = 1 AND
|
||||||
|
sum(case when stripe_num = 2 AND first_row_number = 1 AND row_count = 10000 then 1 end) = 1
|
||||||
|
AS stripe_metadata_for_test_insert_concurrency_ok
|
||||||
|
FROM test_insert_concurrency_stripes;
|
||||||
|
|
||||||
|
stripe_metadata_for_test_insert_concurrency_ok
|
||||||
|
|
||||||
|
t
|
||||||
|
|
|
@ -260,7 +260,7 @@ ABORT;
|
||||||
-- all below 5 commands should throw no permission errors
|
-- all below 5 commands should throw no permission errors
|
||||||
-- read columnar metadata table
|
-- read columnar metadata table
|
||||||
SELECT * FROM columnar.stripe;
|
SELECT * FROM columnar.stripe;
|
||||||
storage_id | stripe_num | file_offset | data_length | column_count | chunk_row_count | row_count | chunk_group_count
|
storage_id | stripe_num | file_offset | data_length | column_count | chunk_row_count | row_count | chunk_group_count | first_row_number
|
||||||
---------------------------------------------------------------------
|
---------------------------------------------------------------------
|
||||||
(0 rows)
|
(0 rows)
|
||||||
|
|
||||||
|
|
|
@ -24,9 +24,41 @@ CREATE OR REPLACE FUNCTION columnar_storage_info(
|
||||||
reserved_offset OUT int8)
|
reserved_offset OUT int8)
|
||||||
STRICT
|
STRICT
|
||||||
LANGUAGE c AS 'citus', 'columnar_storage_info';
|
LANGUAGE c AS 'citus', 'columnar_storage_info';
|
||||||
|
CREATE VIEW columnar_table_stripe_info AS
|
||||||
|
SELECT columnar_table_storageids.relname relname,
|
||||||
|
columnar.stripe.stripe_num stripe_num,
|
||||||
|
columnar.stripe.row_count row_count,
|
||||||
|
columnar.stripe.first_row_number first_row_number
|
||||||
|
FROM columnar.stripe,
|
||||||
|
(
|
||||||
|
SELECT c.oid relid, c.relname relname, (columnar_storage_info(c.oid)).storage_id relstorageid
|
||||||
|
FROM pg_class c, pg_am a
|
||||||
|
WHERE c.relam = a.oid AND amname = 'columnar'
|
||||||
|
) columnar_table_storageids
|
||||||
|
WHERE relstorageid = columnar.stripe.storage_id;
|
||||||
SET search_path TO upgrade_columnar_metapage, public;
|
SET search_path TO upgrade_columnar_metapage, public;
|
||||||
|
-- show that first_row_number values are equal to MAX(row_count) * stripe_num + COLUMNAR_FIRST_ROW_NUMBER
|
||||||
|
SELECT * FROM columnar_table_stripe_info ORDER BY relname, stripe_num;
|
||||||
|
relname | stripe_num | row_count | first_row_number
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
columnar_table_1 | 1 | 150000 | 1
|
||||||
|
columnar_table_1 | 2 | 10000 | 150001
|
||||||
|
columnar_table_2 | 1 | 1000 | 1
|
||||||
|
columnar_table_2 | 2 | 901 | 150001
|
||||||
|
columnar_table_3 | 1 | 2 | 1
|
||||||
|
(5 rows)
|
||||||
|
|
||||||
-- should work since we upgrade metapages when upgrading schema version
|
-- should work since we upgrade metapages when upgrading schema version
|
||||||
INSERT INTO columnar_table_1 VALUES (3);
|
INSERT INTO columnar_table_1 VALUES (3);
|
||||||
|
-- state of stripe metadata for columnar_table_1 after post-upgrade insert
|
||||||
|
SELECT * FROM columnar_table_stripe_info WHERE relname = 'columnar_table_1' ORDER BY stripe_num;
|
||||||
|
relname | stripe_num | row_count | first_row_number
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
columnar_table_1 | 1 | 150000 | 1
|
||||||
|
columnar_table_1 | 2 | 10000 | 150001
|
||||||
|
columnar_table_1 | 3 | 1 | 160001
|
||||||
|
(3 rows)
|
||||||
|
|
||||||
-- show that all columnar relation's metapage's are upgraded to "2.0"
|
-- show that all columnar relation's metapage's are upgraded to "2.0"
|
||||||
SELECT count(*)=0
|
SELECT count(*)=0
|
||||||
FROM (SELECT (columnar_storage_info(c.oid)).* t
|
FROM (SELECT (columnar_storage_info(c.oid)).* t
|
||||||
|
@ -42,13 +74,20 @@ WHERE t.version_major != 2 and t.version_minor != 0;
|
||||||
SELECT columnar_storage_info('columnar_table_1');
|
SELECT columnar_storage_info('columnar_table_1');
|
||||||
columnar_storage_info
|
columnar_storage_info
|
||||||
---------------------------------------------------------------------
|
---------------------------------------------------------------------
|
||||||
(2,0,10000000000,4,0,481936)
|
(2,0,10000000000,4,310001,481936)
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
SELECT columnar_storage_info('columnar_table_2');
|
SELECT columnar_storage_info('columnar_table_2');
|
||||||
columnar_storage_info
|
columnar_storage_info
|
||||||
---------------------------------------------------------------------
|
---------------------------------------------------------------------
|
||||||
(2,0,10000000001,2,0,16350)
|
(2,0,10000000001,3,150902,26694)
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- show that no_data_columnar_table also has metapage after upgrade
|
||||||
|
SELECT columnar_storage_info('no_data_columnar_table');
|
||||||
|
columnar_storage_info
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
(2,0,10000000003,1,1,16336)
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
-- table is already upgraded, make sure that upgrade_columnar_metapage is no-op
|
-- table is already upgraded, make sure that upgrade_columnar_metapage is no-op
|
||||||
|
@ -63,6 +102,21 @@ WHERE c.relam = a.oid AND amname = 'columnar' and relname = 'columnar_table_2';
|
||||||
SELECT columnar_storage_info('columnar_table_2');
|
SELECT columnar_storage_info('columnar_table_2');
|
||||||
columnar_storage_info
|
columnar_storage_info
|
||||||
---------------------------------------------------------------------
|
---------------------------------------------------------------------
|
||||||
(2,0,10000000001,2,0,16350)
|
(2,0,10000000001,3,150902,26694)
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
|
VACUUM FULL columnar_table_2;
|
||||||
|
-- print metapage and stripe metadata after post-upgrade vacuum full
|
||||||
|
SELECT columnar_storage_info('columnar_table_2');
|
||||||
|
columnar_storage_info
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
(2,0,10000000004,3,2001,26694)
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT * FROM columnar_table_stripe_info WHERE relname = 'columnar_table_2' ORDER BY stripe_num;
|
||||||
|
relname | stripe_num | row_count | first_row_number
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
columnar_table_2 | 1 | 1000 | 1
|
||||||
|
columnar_table_2 | 2 | 901 | 1001
|
||||||
|
(2 rows)
|
||||||
|
|
||||||
|
|
|
@ -17,7 +17,15 @@ SET search_path TO upgrade_columnar_metapage, public;
|
||||||
CREATE TABLE columnar_table_1(a INT, b INT) USING columnar;
|
CREATE TABLE columnar_table_1(a INT, b INT) USING columnar;
|
||||||
INSERT INTO columnar_table_1 SELECT i FROM generate_series(160001, 320000) i;
|
INSERT INTO columnar_table_1 SELECT i FROM generate_series(160001, 320000) i;
|
||||||
CREATE TABLE columnar_table_2(b INT) USING columnar;
|
CREATE TABLE columnar_table_2(b INT) USING columnar;
|
||||||
INSERT INTO columnar_table_2 VALUES (160);
|
SELECT alter_columnar_table_set('columnar_table_2',
|
||||||
|
chunk_group_row_limit => 100,
|
||||||
|
stripe_row_limit => 1000);
|
||||||
|
alter_columnar_table_set
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
INSERT INTO columnar_table_2 SELECT i FROM generate_series(1600, 3500) i;
|
||||||
CREATE TABLE columnar_table_3(b INT) USING columnar;
|
CREATE TABLE columnar_table_3(b INT) USING columnar;
|
||||||
INSERT INTO columnar_table_3 VALUES (1), (2);
|
INSERT INTO columnar_table_3 VALUES (1), (2);
|
||||||
CREATE TABLE no_data_columnar_table(a INT, b INT, c TEXT) USING columnar;
|
CREATE TABLE no_data_columnar_table(a INT, b INT, c TEXT) USING columnar;
|
||||||
|
|
|
@ -19,7 +19,7 @@ select
|
||||||
from columnar_test_helpers.columnar_storage_info('contestant');
|
from columnar_test_helpers.columnar_storage_info('contestant');
|
||||||
version_major | version_minor | reserved_stripe_id | reserved_row_number | reserved_offset
|
version_major | version_minor | reserved_stripe_id | reserved_row_number | reserved_offset
|
||||||
---------------------------------------------------------------------
|
---------------------------------------------------------------------
|
||||||
2 | 0 | 3 | 1 | 24742
|
2 | 0 | 3 | 300001 | 24742
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
-- COPY into compressed table
|
-- COPY into compressed table
|
||||||
|
@ -32,7 +32,7 @@ select
|
||||||
from columnar_test_helpers.columnar_storage_info('contestant_compressed');
|
from columnar_test_helpers.columnar_storage_info('contestant_compressed');
|
||||||
version_major | version_minor | reserved_stripe_id | reserved_row_number | reserved_offset
|
version_major | version_minor | reserved_stripe_id | reserved_row_number | reserved_offset
|
||||||
---------------------------------------------------------------------
|
---------------------------------------------------------------------
|
||||||
2 | 0 | 3 | 1 | 24704
|
2 | 0 | 3 | 300001 | 24704
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
-- Test column list
|
-- Test column list
|
||||||
|
|
|
@ -1,6 +1,10 @@
|
||||||
setup
|
setup
|
||||||
{
|
{
|
||||||
CREATE TABLE test_insert_concurrency (a int, b int) USING columnar;
|
CREATE TABLE test_insert_concurrency (a int, b int) USING columnar;
|
||||||
|
|
||||||
|
CREATE OR REPLACE FUNCTION columnar_relation_storageid(relid oid) RETURNS bigint
|
||||||
|
LANGUAGE C STABLE STRICT
|
||||||
|
AS 'citus', $$columnar_relation_storageid$$;
|
||||||
}
|
}
|
||||||
|
|
||||||
teardown
|
teardown
|
||||||
|
@ -20,6 +24,11 @@ step "s1-insert"
|
||||||
INSERT INTO test_insert_concurrency SELECT i, 2 * i FROM generate_series(1, 3) i;
|
INSERT INTO test_insert_concurrency SELECT i, 2 * i FROM generate_series(1, 3) i;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
step "s1-insert-10000-rows"
|
||||||
|
{
|
||||||
|
INSERT INTO test_insert_concurrency SELECT i, 2 * i FROM generate_series(1, 10000) i;
|
||||||
|
}
|
||||||
|
|
||||||
step "s1-copy"
|
step "s1-copy"
|
||||||
{
|
{
|
||||||
COPY test_insert_concurrency(a) FROM PROGRAM 'seq 11 13';
|
COPY test_insert_concurrency(a) FROM PROGRAM 'seq 11 13';
|
||||||
|
@ -30,6 +39,29 @@ step "s1-select"
|
||||||
SELECT * FROM test_insert_concurrency ORDER BY a;
|
SELECT * FROM test_insert_concurrency ORDER BY a;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
step "s1-truncate"
|
||||||
|
{
|
||||||
|
TRUNCATE test_insert_concurrency;
|
||||||
|
}
|
||||||
|
|
||||||
|
step "s1-verify-metadata"
|
||||||
|
{
|
||||||
|
WITH test_insert_concurrency_stripes AS (
|
||||||
|
SELECT first_row_number, stripe_num, row_count
|
||||||
|
FROM columnar.stripe a, pg_class b
|
||||||
|
WHERE columnar_relation_storageid(b.oid)=a.storage_id AND
|
||||||
|
relname = 'test_insert_concurrency'
|
||||||
|
)
|
||||||
|
SELECT
|
||||||
|
-- verify that table has two stripes ..
|
||||||
|
count(*) = 2 AND
|
||||||
|
-- .. and those stripes look like:
|
||||||
|
sum(case when stripe_num = 1 AND first_row_number = 150001 AND row_count = 3 then 1 end) = 1 AND
|
||||||
|
sum(case when stripe_num = 2 AND first_row_number = 1 AND row_count = 10000 then 1 end) = 1
|
||||||
|
AS stripe_metadata_for_test_insert_concurrency_ok
|
||||||
|
FROM test_insert_concurrency_stripes;
|
||||||
|
}
|
||||||
|
|
||||||
step "s1-commit"
|
step "s1-commit"
|
||||||
{
|
{
|
||||||
COMMIT;
|
COMMIT;
|
||||||
|
@ -65,3 +97,9 @@ permutation "s1-begin" "s2-begin" "s1-copy" "s2-insert" "s1-select" "s2-select"
|
||||||
|
|
||||||
# insert vs copy
|
# insert vs copy
|
||||||
permutation "s1-begin" "s2-begin" "s2-insert" "s1-copy" "s1-select" "s2-select" "s1-commit" "s2-commit" "s1-select"
|
permutation "s1-begin" "s2-begin" "s2-insert" "s1-copy" "s1-select" "s2-select" "s1-commit" "s2-commit" "s1-select"
|
||||||
|
|
||||||
|
# insert vs insert
|
||||||
|
# Start inserting rows in session 1, reserve first_row_number to be 1 for session 1 but commit session 2 before session 1.
|
||||||
|
# Then verify that while the stripe written by session 2 has the greater first_row_number, stripe written by session 1 has
|
||||||
|
# the greater stripe_num. This is because, we reserve stripe_num and first_row_number at different times.
|
||||||
|
permutation "s1-truncate" "s1-begin" "s1-insert-10000-rows" "s2-begin" "s2-insert" "s2-commit" "s1-commit" "s1-verify-metadata"
|
||||||
|
|
|
@ -0,0 +1,43 @@
|
||||||
|
CREATE SCHEMA columnar_first_row_number;
|
||||||
|
SET search_path tO columnar_first_row_number;
|
||||||
|
|
||||||
|
CREATE TABLE col_table_1 (a int) USING columnar;
|
||||||
|
|
||||||
|
INSERT INTO col_table_1 SELECT i FROM generate_series(1, 10) i;
|
||||||
|
|
||||||
|
BEGIN;
|
||||||
|
-- we don't use same first_row_number even if the xact is rollback'ed
|
||||||
|
INSERT INTO col_table_1 SELECT i FROM generate_series(1, 11) i;
|
||||||
|
ROLLBACK;
|
||||||
|
|
||||||
|
INSERT INTO col_table_1 SELECT i FROM generate_series(1, 12) i;
|
||||||
|
|
||||||
|
SELECT alter_columnar_table_set('col_table_1', stripe_row_limit => 100);
|
||||||
|
|
||||||
|
INSERT INTO col_table_1 SELECT i FROM generate_series(1, 235) i;
|
||||||
|
|
||||||
|
SELECT row_count, first_row_number FROM columnar.stripe a
|
||||||
|
WHERE a.storage_id = columnar_test_helpers.columnar_relation_storageid('col_table_1'::regclass)
|
||||||
|
ORDER BY stripe_num;
|
||||||
|
|
||||||
|
VACUUM FULL col_table_1;
|
||||||
|
|
||||||
|
-- show that we properly update first_row_number after VACUUM FULL
|
||||||
|
SELECT row_count, first_row_number FROM columnar.stripe a
|
||||||
|
WHERE a.storage_id = columnar_test_helpers.columnar_relation_storageid('col_table_1'::regclass)
|
||||||
|
ORDER BY stripe_num;
|
||||||
|
|
||||||
|
TRUNCATE col_table_1;
|
||||||
|
|
||||||
|
BEGIN;
|
||||||
|
INSERT INTO col_table_1 SELECT i FROM generate_series(1, 16) i;
|
||||||
|
INSERT INTO col_table_1 SELECT i FROM generate_series(1, 16) i;
|
||||||
|
COMMIT;
|
||||||
|
|
||||||
|
-- show that we start with first_row_number=1 after TRUNCATE
|
||||||
|
SELECT row_count, first_row_number FROM columnar.stripe a
|
||||||
|
WHERE a.storage_id = columnar_test_helpers.columnar_relation_storageid('col_table_1'::regclass)
|
||||||
|
ORDER BY stripe_num;
|
||||||
|
|
||||||
|
SET client_min_messages TO ERROR;
|
||||||
|
DROP SCHEMA columnar_first_row_number CASCADE;
|
|
@ -21,11 +21,30 @@ CREATE OR REPLACE FUNCTION columnar_storage_info(
|
||||||
STRICT
|
STRICT
|
||||||
LANGUAGE c AS 'citus', 'columnar_storage_info';
|
LANGUAGE c AS 'citus', 'columnar_storage_info';
|
||||||
|
|
||||||
|
CREATE VIEW columnar_table_stripe_info AS
|
||||||
|
SELECT columnar_table_storageids.relname relname,
|
||||||
|
columnar.stripe.stripe_num stripe_num,
|
||||||
|
columnar.stripe.row_count row_count,
|
||||||
|
columnar.stripe.first_row_number first_row_number
|
||||||
|
FROM columnar.stripe,
|
||||||
|
(
|
||||||
|
SELECT c.oid relid, c.relname relname, (columnar_storage_info(c.oid)).storage_id relstorageid
|
||||||
|
FROM pg_class c, pg_am a
|
||||||
|
WHERE c.relam = a.oid AND amname = 'columnar'
|
||||||
|
) columnar_table_storageids
|
||||||
|
WHERE relstorageid = columnar.stripe.storage_id;
|
||||||
|
|
||||||
SET search_path TO upgrade_columnar_metapage, public;
|
SET search_path TO upgrade_columnar_metapage, public;
|
||||||
|
|
||||||
|
-- show that first_row_number values are equal to MAX(row_count) * stripe_num + COLUMNAR_FIRST_ROW_NUMBER
|
||||||
|
SELECT * FROM columnar_table_stripe_info ORDER BY relname, stripe_num;
|
||||||
|
|
||||||
-- should work since we upgrade metapages when upgrading schema version
|
-- should work since we upgrade metapages when upgrading schema version
|
||||||
INSERT INTO columnar_table_1 VALUES (3);
|
INSERT INTO columnar_table_1 VALUES (3);
|
||||||
|
|
||||||
|
-- state of stripe metadata for columnar_table_1 after post-upgrade insert
|
||||||
|
SELECT * FROM columnar_table_stripe_info WHERE relname = 'columnar_table_1' ORDER BY stripe_num;
|
||||||
|
|
||||||
-- show that all columnar relation's metapage's are upgraded to "2.0"
|
-- show that all columnar relation's metapage's are upgraded to "2.0"
|
||||||
SELECT count(*)=0
|
SELECT count(*)=0
|
||||||
FROM (SELECT (columnar_storage_info(c.oid)).* t
|
FROM (SELECT (columnar_storage_info(c.oid)).* t
|
||||||
|
@ -37,9 +56,18 @@ WHERE t.version_major != 2 and t.version_minor != 0;
|
||||||
SELECT columnar_storage_info('columnar_table_1');
|
SELECT columnar_storage_info('columnar_table_1');
|
||||||
SELECT columnar_storage_info('columnar_table_2');
|
SELECT columnar_storage_info('columnar_table_2');
|
||||||
|
|
||||||
|
-- show that no_data_columnar_table also has metapage after upgrade
|
||||||
|
SELECT columnar_storage_info('no_data_columnar_table');
|
||||||
|
|
||||||
-- table is already upgraded, make sure that upgrade_columnar_metapage is no-op
|
-- table is already upgraded, make sure that upgrade_columnar_metapage is no-op
|
||||||
SELECT citus_internal.upgrade_columnar_storage(c.oid)
|
SELECT citus_internal.upgrade_columnar_storage(c.oid)
|
||||||
FROM pg_class c, pg_am a
|
FROM pg_class c, pg_am a
|
||||||
WHERE c.relam = a.oid AND amname = 'columnar' and relname = 'columnar_table_2';
|
WHERE c.relam = a.oid AND amname = 'columnar' and relname = 'columnar_table_2';
|
||||||
|
|
||||||
SELECT columnar_storage_info('columnar_table_2');
|
SELECT columnar_storage_info('columnar_table_2');
|
||||||
|
|
||||||
|
VACUUM FULL columnar_table_2;
|
||||||
|
|
||||||
|
-- print metapage and stripe metadata after post-upgrade vacuum full
|
||||||
|
SELECT columnar_storage_info('columnar_table_2');
|
||||||
|
SELECT * FROM columnar_table_stripe_info WHERE relname = 'columnar_table_2' ORDER BY stripe_num;
|
||||||
|
|
|
@ -15,7 +15,10 @@ CREATE TABLE columnar_table_1(a INT, b INT) USING columnar;
|
||||||
INSERT INTO columnar_table_1 SELECT i FROM generate_series(160001, 320000) i;
|
INSERT INTO columnar_table_1 SELECT i FROM generate_series(160001, 320000) i;
|
||||||
|
|
||||||
CREATE TABLE columnar_table_2(b INT) USING columnar;
|
CREATE TABLE columnar_table_2(b INT) USING columnar;
|
||||||
INSERT INTO columnar_table_2 VALUES (160);
|
SELECT alter_columnar_table_set('columnar_table_2',
|
||||||
|
chunk_group_row_limit => 100,
|
||||||
|
stripe_row_limit => 1000);
|
||||||
|
INSERT INTO columnar_table_2 SELECT i FROM generate_series(1600, 3500) i;
|
||||||
|
|
||||||
CREATE TABLE columnar_table_3(b INT) USING columnar;
|
CREATE TABLE columnar_table_3(b INT) USING columnar;
|
||||||
INSERT INTO columnar_table_3 VALUES (1), (2);
|
INSERT INTO columnar_table_3 VALUES (1), (2);
|
||||||
|
|
Loading…
Reference in New Issue