Columnnar: metapage changes. (#4907)

* Columnar: introduce columnar storage API.

This new API is responsible for the low-level storage details of
columnar; translating large reads and writes into individual block
reads and writes that respect the page headers and emit WAL. It's also
responsible for the columnar metapage, resource reservations (stripe
IDs, row numbers, and data), and truncation.

This new API is not used yet, but will be used in subsequent
forthcoming commits.

* Columnar: add columnar_storage_info() for debugging purposes.

* Columnar: expose ColumnarMetadataNewStorageId().

* Columnar: always initialize metapage at creation time.

This avoids the complexity of dealing with tables where the metapage
has not yet been initialized.

* Columnar: columnar storage upgrade/downgrade UDFs.

Necessary upgrade/downgrade step so that new code doesn't see an old
metapage.

* Columnar: improve metadata.c comment.

* Columnar: make ColumnarMetapage internal to the storage API.

Callers should not have or need direct access to the metapage.

* Columnar: perform resource reservation using storage API.

* Columnar: implement truncate using storage API.

* Columnar: implement read/write paths with storage API.

* Columnar: add storage tests.

* Revert "Columnar: don't include stripe reservation locks in lock graph."

This reverts commit c3dcd6b9f8.

No longer needed because the columnar storage API takes care of
concurrency for resource reservation.

* Columnar: remove unnecessary lock when reserving.

No longer necessary because the columnar storage API takes care of
concurrent resource reservation.

* Add simple upgrade tests for storage/ branch

* fix multi_extension.out

Co-authored-by: Onur Tirtir <onurcantirtir@gmail.com>
pull/5052/head
jeff-davis 2021-05-05 22:36:12 -07:00 committed by Onur Tirtir
parent 7def297a3b
commit 7b9aecff21
46 changed files with 1653 additions and 422 deletions

View File

@ -12,6 +12,7 @@
#include "pg_config.h"
#include "access/nbtree.h"
#include "access/table.h"
#include "catalog/pg_am.h"
#include "catalog/pg_type.h"
#include "distributed/pg_version_constants.h"
@ -25,11 +26,13 @@
#include "utils/tuplestore.h"
#include "columnar/columnar.h"
#include "columnar/columnar_storage.h"
#include "columnar/columnar_version_compat.h"
static void MemoryContextTotals(MemoryContext context, MemoryContextCounters *counters);
PG_FUNCTION_INFO_V1(columnar_store_memory_stats);
PG_FUNCTION_INFO_V1(columnar_storage_info);
/*
@ -72,6 +75,74 @@ columnar_store_memory_stats(PG_FUNCTION_ARGS)
}
/*
* columnar_storage_info - UDF to return internal storage info for a columnar relation.
*
* DDL:
* CREATE OR REPLACE FUNCTION columnar_storage_info(
* rel regclass,
* version_major OUT int4,
* version_minor OUT int4,
* storage_id OUT int8,
* reserved_stripe_id OUT int8,
* reserved_row_number OUT int8,
* reserved_offset OUT int8)
* STRICT
* LANGUAGE c AS 'MODULE_PATHNAME', 'columnar_storage_info';
*/
Datum
columnar_storage_info(PG_FUNCTION_ARGS)
{
#define STORAGE_INFO_NATTS 6
Oid relid = PG_GETARG_OID(0);
TupleDesc tupdesc;
/* Build a tuple descriptor for our result type */
if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
{
elog(ERROR, "return type must be a row type");
}
if (tupdesc->natts != STORAGE_INFO_NATTS)
{
elog(ERROR, "return type must have %d columns", STORAGE_INFO_NATTS);
}
Relation rel = table_open(relid, AccessShareLock);
if (!IsColumnarTableAmTable(relid))
{
ereport(ERROR, (errmsg("table \"%s\" is not a columnar table",
RelationGetRelationName(rel))));
}
RelationOpenSmgr(rel);
Datum values[STORAGE_INFO_NATTS] = { 0 };
bool nulls[STORAGE_INFO_NATTS] = { 0 };
/*
* Pass force = true so that we can inspect metapages that are not the
* current version.
*
* NB: ensure the order and number of attributes correspond to DDL
* declaration.
*/
values[0] = Int32GetDatum(ColumnarStorageGetVersionMajor(rel, true));
values[1] = Int32GetDatum(ColumnarStorageGetVersionMinor(rel, true));
values[2] = Int64GetDatum(ColumnarStorageGetStorageId(rel, true));
values[3] = Int64GetDatum(ColumnarStorageGetReservedStripeId(rel, true));
values[4] = Int64GetDatum(ColumnarStorageGetReservedRowNumber(rel, true));
values[5] = Int64GetDatum(ColumnarStorageGetReservedOffset(rel, true));
/* release lock */
table_close(rel, AccessShareLock);
HeapTuple tuple = heap_form_tuple(tupdesc, values, nulls);
PG_RETURN_DATUM(HeapTupleGetDatum(tuple));
}
/*
* MemoryContextTotals adds stats of the given memory context and its
* subtree to the given counters.

View File

@ -1,8 +1,19 @@
/*-------------------------------------------------------------------------
*
* columnar_metadata_tables.c
* columnar_metadata.c
*
* Copyright (c), Citus Data, Inc.
* Copyright (c) Citus Data, Inc.
*
* Manages metadata for columnar relations in separate, shared metadata tables
* in the "columnar" schema.
*
* * holds basic stripe information including data size and row counts
* * holds basic chunk and chunk group information like data offsets and
* min/max values (used for Chunk Group Filtering)
* * useful for fast VACUUM operations (e.g. reporting with VACUUM VERBOSE)
* * useful for stats/costing
* * TODO: maps logical row numbers to stripe IDs
* * TODO: visibility information
*
*-------------------------------------------------------------------------
*/
@ -14,6 +25,7 @@
#include "citus_version.h"
#include "columnar/columnar.h"
#include "columnar/columnar_storage.h"
#include "columnar/columnar_version_compat.h"
#include <sys/stat.h>
@ -30,7 +42,6 @@
#include "commands/sequence.h"
#include "commands/trigger.h"
#include "distributed/metadata_cache.h"
#include "distributed/resource_lock.h"
#include "executor/executor.h"
#include "executor/spi.h"
#include "miscadmin.h"
@ -48,28 +59,6 @@
#include "utils/relfilenodemap.h"
/*
* Content of the first page in main fork, which stores metadata at file
* level.
*/
typedef struct ColumnarMetapage
{
/*
* Store version of file format used, so we can detect files from
* previous versions if we change file format.
*/
int versionMajor;
int versionMinor;
/*
* Each of the metadata table rows are identified by a storageId.
* We store it also in the main fork so we can link metadata rows
* with data files.
*/
uint64 storageId;
} ColumnarMetapage;
typedef struct
{
Relation rel;
@ -80,8 +69,6 @@ static void InsertStripeMetadataRow(uint64 storageId, StripeMetadata *stripe);
static void GetHighestUsedAddressAndId(uint64 storageId,
uint64 *highestUsedAddress,
uint64 *highestUsedId);
static void LockForStripeReservation(Relation rel, LOCKMODE mode);
static void UnlockForStripeReservation(Relation rel, LOCKMODE mode);
static List * ReadDataFileStripeList(uint64 storageId, Snapshot snapshot);
static uint32 * ReadChunkGroupRowCounts(uint64 storageId, uint64 stripe, uint32
chunkGroupCount);
@ -95,6 +82,7 @@ static Oid ColumnarChunkGroupRelationId(void);
static Oid ColumnarChunkIndexRelationId(void);
static Oid ColumnarChunkGroupIndexRelationId(void);
static Oid ColumnarNamespaceId(void);
static uint64 LookupStorageId(RelFileNode relfilenode);
static void DeleteStorageFromColumnarMetadataTable(Oid metadataTableId,
AttrNumber storageIdAtrrNumber,
Oid storageIdIndexId,
@ -107,8 +95,6 @@ static void FinishModifyRelation(ModifyState *state);
static EState * create_estate_for_relation(Relation rel);
static bytea * DatumToBytea(Datum value, Form_pg_attribute attrForm);
static Datum ByteaToDatum(bytea *bytes, Form_pg_attribute attrForm);
static ColumnarMetapage * InitMetapage(Relation relation);
static ColumnarMetapage * ReadMetapage(RelFileNode relfilenode, bool missingOk);
static bool WriteColumnarOptions(Oid regclass, ColumnarOptions *options, bool overwrite);
PG_FUNCTION_INFO_V1(columnar_relation_storageid);
@ -423,7 +409,7 @@ SaveStripeSkipList(RelFileNode relfilenode, uint64 stripe, StripeSkipList *chunk
uint32 chunkIndex = 0;
uint32 columnCount = chunkList->columnCount;
ColumnarMetapage *metapage = ReadMetapage(relfilenode, false);
uint64 storageId = LookupStorageId(relfilenode);
Oid columnarChunkOid = ColumnarChunkRelationId();
Relation columnarChunk = table_open(columnarChunkOid, RowExclusiveLock);
ModifyState *modifyState = StartModifyRelation(columnarChunk);
@ -436,7 +422,7 @@ SaveStripeSkipList(RelFileNode relfilenode, uint64 stripe, StripeSkipList *chunk
&chunkList->chunkSkipNodeArray[columnIndex][chunkIndex];
Datum values[Natts_columnar_chunk] = {
UInt64GetDatum(metapage->storageId),
UInt64GetDatum(storageId),
Int64GetDatum(stripe),
Int32GetDatum(columnIndex + 1),
Int32GetDatum(chunkIndex),
@ -487,7 +473,7 @@ void
SaveChunkGroups(RelFileNode relfilenode, uint64 stripe,
List *chunkGroupRowCounts)
{
ColumnarMetapage *metapage = ReadMetapage(relfilenode, false);
uint64 storageId = LookupStorageId(relfilenode);
Oid columnarChunkGroupOid = ColumnarChunkGroupRelationId();
Relation columnarChunkGroup = table_open(columnarChunkGroupOid, RowExclusiveLock);
ModifyState *modifyState = StartModifyRelation(columnarChunkGroup);
@ -499,7 +485,7 @@ SaveChunkGroups(RelFileNode relfilenode, uint64 stripe,
{
int64 rowCount = lfirst_int(lc);
Datum values[Natts_columnar_chunkgroup] = {
UInt64GetDatum(metapage->storageId),
UInt64GetDatum(storageId),
Int64GetDatum(stripe),
Int32GetDatum(chunkId),
Int64GetDatum(rowCount)
@ -530,14 +516,14 @@ ReadStripeSkipList(RelFileNode relfilenode, uint64 stripe, TupleDesc tupleDescri
uint32 columnCount = tupleDescriptor->natts;
ScanKeyData scanKey[2];
ColumnarMetapage *metapage = ReadMetapage(relfilenode, false);
uint64 storageId = LookupStorageId(relfilenode);
Oid columnarChunkOid = ColumnarChunkRelationId();
Relation columnarChunk = table_open(columnarChunkOid, AccessShareLock);
Relation index = index_open(ColumnarChunkIndexRelationId(), AccessShareLock);
ScanKeyInit(&scanKey[0], Anum_columnar_chunk_storageid,
BTEqualStrategyNumber, F_OIDEQ, UInt64GetDatum(metapage->storageId));
BTEqualStrategyNumber, F_OIDEQ, UInt64GetDatum(storageId));
ScanKeyInit(&scanKey[1], Anum_columnar_chunk_stripe,
BTEqualStrategyNumber, F_OIDEQ, Int32GetDatum(stripe));
@ -624,7 +610,7 @@ ReadStripeSkipList(RelFileNode relfilenode, uint64 stripe, TupleDesc tupleDescri
table_close(columnarChunk, AccessShareLock);
chunkList->chunkGroupRowCounts =
ReadChunkGroupRowCounts(metapage->storageId, stripe, chunkCount);
ReadChunkGroupRowCounts(storageId, stripe, chunkCount);
return chunkList;
}
@ -729,15 +715,9 @@ InsertStripeMetadataRow(uint64 storageId, StripeMetadata *stripe)
List *
StripesForRelfilenode(RelFileNode relfilenode)
{
ColumnarMetapage *metapage = ReadMetapage(relfilenode, true);
if (metapage == NULL)
{
/* empty relation */
return NIL;
}
uint64 storageId = LookupStorageId(relfilenode);
return ReadDataFileStripeList(metapage->storageId, GetTransactionSnapshot());
return ReadDataFileStripeList(storageId, GetTransactionSnapshot());
}
@ -752,17 +732,11 @@ StripesForRelfilenode(RelFileNode relfilenode)
uint64
GetHighestUsedAddress(RelFileNode relfilenode)
{
uint64 storageId = LookupStorageId(relfilenode);
uint64 highestUsedAddress = 0;
uint64 highestUsedId = 0;
ColumnarMetapage *metapage = ReadMetapage(relfilenode, true);
/* empty data file? */
if (metapage == NULL)
{
return 0;
}
GetHighestUsedAddressAndId(metapage->storageId, &highestUsedAddress, &highestUsedId);
GetHighestUsedAddressAndId(storageId, &highestUsedAddress, &highestUsedId);
return highestUsedAddress;
}
@ -799,35 +773,6 @@ GetHighestUsedAddressAndId(uint64 storageId,
}
/*
* LockForStripeReservation acquires a lock for stripe reservation.
*/
static void
LockForStripeReservation(Relation rel, LOCKMODE mode)
{
/*
* We use an advisory lock here so we can easily detect these kind of
* locks in IsProcessWaitingForSafeOperations() and don't include them
* in the lock graph.
*/
LOCKTAG tag;
SET_LOCKTAG_COLUMNAR_STRIPE_RESERVATION(tag, rel);
LockAcquire(&tag, mode, false, false);
}
/*
* UnlockForStripeReservation releases the stripe reservation lock.
*/
static void
UnlockForStripeReservation(Relation rel, LOCKMODE mode)
{
LOCKTAG tag;
SET_LOCKTAG_COLUMNAR_STRIPE_RESERVATION(tag, rel);
LockRelease(&tag, mode, false);
}
/*
* ReserveStripe reserves and stripe of given size for the given relation,
* and inserts it into columnar.stripe. It is guaranteed that concurrent
@ -839,47 +784,16 @@ ReserveStripe(Relation rel, uint64 sizeBytes,
uint64 chunkCount, uint64 chunkGroupRowCount)
{
StripeMetadata stripe = { 0 };
uint64 currLogicalHigh = 0;
uint64 highestId = 0;
uint64 storageId = ColumnarStorageGetStorageId(rel, false);
/*
* We take ExclusiveLock here, so two space reservations conflict.
* TODO: For now, we don't use row number reservation at all, so just use
* dummy values.
*/
LOCKMODE lockMode = ExclusiveLock;
LockForStripeReservation(rel, lockMode);
RelFileNode relfilenode = rel->rd_node;
/*
* If this is the first stripe for this relation, initialize the
* metapage, otherwise use the previously initialized metapage.
*/
ColumnarMetapage *metapage = ReadMetapage(relfilenode, true);
if (metapage == NULL)
{
metapage = InitMetapage(rel);
}
GetHighestUsedAddressAndId(metapage->storageId, &currLogicalHigh, &highestId);
SmgrAddr currSmgrHigh = logical_to_smgr(currLogicalHigh);
SmgrAddr resSmgrStart = next_block_start(currSmgrHigh);
uint64 resLogicalStart = smgr_to_logical(resSmgrStart);
uint64 resLogicalEnd = resLogicalStart + sizeBytes - 1;
SmgrAddr resSmgrEnd = logical_to_smgr(resLogicalEnd);
RelationOpenSmgr(rel);
uint64 nblocks = smgrnblocks(rel->rd_smgr, MAIN_FORKNUM);
while (resSmgrEnd.blockno >= nblocks)
{
Buffer newBuffer = ReadBuffer(rel, P_NEW);
ReleaseBuffer(newBuffer);
nblocks = smgrnblocks(rel->rd_smgr, MAIN_FORKNUM);
}
RelationCloseSmgr(rel);
uint64 firstReservedRow;
uint64 stripeId = ColumnarStorageReserveStripe(rel, 0, &firstReservedRow);
uint64 resLogicalStart = ColumnarStorageReserveData(rel, sizeBytes);
stripe.fileOffset = resLogicalStart;
stripe.dataLength = sizeBytes;
@ -887,11 +801,9 @@ ReserveStripe(Relation rel, uint64 sizeBytes,
stripe.chunkGroupRowCount = chunkGroupRowCount;
stripe.columnCount = columnCount;
stripe.rowCount = rowCount;
stripe.id = highestId + 1;
stripe.id = stripeId;
InsertStripeMetadataRow(metapage->storageId, &stripe);
UnlockForStripeReservation(rel, lockMode);
InsertStripeMetadataRow(storageId, &stripe);
return stripe;
}
@ -970,28 +882,20 @@ DeleteMetadataRows(RelFileNode relfilenode)
return;
}
ColumnarMetapage *metapage = ReadMetapage(relfilenode, true);
if (metapage == NULL)
{
/*
* No data has been written to this storage yet, so there is no
* associated metadata yet.
*/
return;
}
uint64 storageId = LookupStorageId(relfilenode);
DeleteStorageFromColumnarMetadataTable(ColumnarStripeRelationId(),
Anum_columnar_stripe_storageid,
ColumnarStripeIndexRelationId(),
metapage->storageId);
storageId);
DeleteStorageFromColumnarMetadataTable(ColumnarChunkGroupRelationId(),
Anum_columnar_chunkgroup_storageid,
ColumnarChunkGroupIndexRelationId(),
metapage->storageId);
storageId);
DeleteStorageFromColumnarMetadataTable(ColumnarChunkRelationId(),
Anum_columnar_chunk_storageid,
ColumnarChunkIndexRelationId(),
metapage->storageId);
storageId);
}
@ -1312,75 +1216,31 @@ ColumnarNamespaceId(void)
/*
* ReadMetapage reads metapage for the given relfilenode. It returns
* LookupStorageId reads storage metapage to find the storage ID for the given relfilenode. It returns
* false if the relation doesn't have a meta page yet.
*/
static ColumnarMetapage *
ReadMetapage(RelFileNode relfilenode, bool missingOk)
static uint64
LookupStorageId(RelFileNode relfilenode)
{
StringInfo metapageBuffer = NULL;
Oid relationId = RelidByRelfilenode(relfilenode.spcNode,
relfilenode.relNode);
if (OidIsValid(relationId))
{
Relation relation = relation_open(relationId, NoLock);
RelationOpenSmgr(relation);
int nblocks = smgrnblocks(relation->rd_smgr, MAIN_FORKNUM);
RelationCloseSmgr(relation);
Relation relation = relation_open(relationId, AccessShareLock);
uint64 storageId = ColumnarStorageGetStorageId(relation, false);
table_close(relation, AccessShareLock);
if (nblocks != 0)
{
metapageBuffer = ReadFromSmgr(relation, 0, sizeof(ColumnarMetapage));
}
relation_close(relation, NoLock);
}
if (metapageBuffer == NULL)
{
if (!missingOk)
{
elog(ERROR, "columnar metapage was not found");
}
return NULL;
}
ColumnarMetapage *metapage = palloc0(sizeof(ColumnarMetapage));
memcpy_s((void *) metapage, sizeof(ColumnarMetapage),
metapageBuffer->data, sizeof(ColumnarMetapage));
return metapage;
return storageId;
}
/*
* InitMetapage initializes metapage for the given relation.
* ColumnarMetadataNewStorageId - create a new, unique storage id and return
* it.
*/
static ColumnarMetapage *
InitMetapage(Relation relation)
uint64
ColumnarMetadataNewStorageId()
{
/*
* If we init metapage during upgrade, we might override the
* pre-upgrade storage id which will render pre-upgrade data
* invisible.
*/
Assert(!IsBinaryUpgrade);
ColumnarMetapage *metapage = palloc0(sizeof(ColumnarMetapage));
metapage->storageId = nextval_internal(ColumnarStorageIdSequenceRelationId(), false);
metapage->versionMajor = COLUMNAR_VERSION_MAJOR;
metapage->versionMinor = COLUMNAR_VERSION_MINOR;
/* create the first block */
Buffer newBuffer = ReadBuffer(relation, P_NEW);
ReleaseBuffer(newBuffer);
Assert(sizeof(ColumnarMetapage) <= BLCKSZ - SizeOfPageHeaderData);
WriteToSmgr(relation, 0, (char *) metapage, sizeof(ColumnarMetapage));
return metapage;
return nextval_internal(ColumnarStorageIdSequenceRelationId(), false);
}
@ -1391,20 +1251,53 @@ InitMetapage(Relation relation)
Datum
columnar_relation_storageid(PG_FUNCTION_ARGS)
{
uint64 storageId = -1;
Oid relationId = PG_GETARG_OID(0);
Relation relation = relation_open(relationId, AccessShareLock);
if (IsColumnarTableAmTable(relationId))
if (!IsColumnarTableAmTable(relationId))
{
ColumnarMetapage *metadata = ReadMetapage(relation->rd_node, true);
if (metadata != NULL)
{
storageId = metadata->storageId;
}
elog(ERROR, "relation \"%s\" is not a columnar table",
RelationGetRelationName(relation));
}
uint64 storageId = ColumnarStorageGetStorageId(relation, false);
relation_close(relation, AccessShareLock);
PG_RETURN_INT64(storageId);
}
/*
* ColumnarStorageUpdateIfNeeded - upgrade columnar storage to the current version by
* using information from the metadata tables.
*/
void
ColumnarStorageUpdateIfNeeded(Relation rel, bool isUpgrade)
{
if (ColumnarStorageIsCurrent(rel))
{
return;
}
RelationOpenSmgr(rel);
BlockNumber nblocks = smgrnblocks(rel->rd_smgr, MAIN_FORKNUM);
if (nblocks < 2)
{
ColumnarStorageInit(rel->rd_smgr, ColumnarMetadataNewStorageId());
return;
}
uint64 storageId = ColumnarStorageGetStorageId(rel, true);
uint64 highestId;
uint64 highestOffset;
GetHighestUsedAddressAndId(storageId, &highestOffset, &highestId);
uint64 reservedStripeId = highestId + 1;
/* XXX: should be set properly */
uint64 reservedRowNumber = 0;
uint64 reservedOffset = highestOffset + 1;
ColumnarStorageUpdateCurrent(rel, isUpgrade, reservedStripeId,
reservedRowNumber, reservedOffset);
}

View File

@ -34,6 +34,7 @@
#include "utils/rel.h"
#include "columnar/columnar.h"
#include "columnar/columnar_storage.h"
#include "columnar/columnar_version_compat.h"
typedef struct ChunkGroupReadState
@ -667,8 +668,12 @@ LoadColumnBuffers(Relation relation, ColumnChunkSkipNode *chunkSkipNodeArray,
{
ColumnChunkSkipNode *chunkSkipNode = &chunkSkipNodeArray[chunkIndex];
uint64 existsOffset = stripeOffset + chunkSkipNode->existsChunkOffset;
StringInfo rawExistsBuffer = ReadFromSmgr(relation, existsOffset,
chunkSkipNode->existsLength);
StringInfo rawExistsBuffer = makeStringInfo();
enlargeStringInfo(rawExistsBuffer, chunkSkipNode->existsLength);
rawExistsBuffer->len = chunkSkipNode->existsLength;
ColumnarStorageRead(relation, existsOffset, rawExistsBuffer->data,
chunkSkipNode->existsLength);
chunkBuffersArray[chunkIndex]->existsBuffer = rawExistsBuffer;
}
@ -679,8 +684,12 @@ LoadColumnBuffers(Relation relation, ColumnChunkSkipNode *chunkSkipNodeArray,
ColumnChunkSkipNode *chunkSkipNode = &chunkSkipNodeArray[chunkIndex];
CompressionType compressionType = chunkSkipNode->valueCompressionType;
uint64 valueOffset = stripeOffset + chunkSkipNode->valueChunkOffset;
StringInfo rawValueBuffer = ReadFromSmgr(relation, valueOffset,
chunkSkipNode->valueLength);
StringInfo rawValueBuffer = makeStringInfo();
enlargeStringInfo(rawValueBuffer, chunkSkipNode->valueLength);
rawValueBuffer->len = chunkSkipNode->valueLength;
ColumnarStorageRead(relation, valueOffset, rawValueBuffer->data,
chunkSkipNode->valueLength);
chunkBuffersArray[chunkIndex]->valueBuffer = rawValueBuffer;
chunkBuffersArray[chunkIndex]->valueCompressionType = compressionType;
@ -1269,30 +1278,3 @@ ColumnDefaultValue(TupleConstr *tupleConstraints, Form_pg_attribute attributeFor
"does not evaluate to constant value")));
}
}
StringInfo
ReadFromSmgr(Relation rel, uint64 offset, uint32 size)
{
StringInfo resultBuffer = makeStringInfo();
uint64 read = 0;
enlargeStringInfo(resultBuffer, size);
resultBuffer->len = size;
while (read < size)
{
SmgrAddr addr = logical_to_smgr(offset + read);
Buffer buffer = ReadBuffer(rel, addr.blockno);
Page page = BufferGetPage(buffer);
PageHeader phdr = (PageHeader) page;
uint32 to_read = Min(size - read, phdr->pd_upper - addr.offset);
memcpy_s(resultBuffer->data + read, size - read, page + addr.offset, to_read);
ReleaseBuffer(buffer);
read += to_read;
}
return resultBuffer;
}

View File

@ -0,0 +1,762 @@
/*-------------------------------------------------------------------------
*
* columnar_storage.c
*
* Copyright (c) Citus Data, Inc.
*
* Low-level storage layer for columnar.
* - Translates columnar read/write operations on logical offsets into operations on pages/blocks.
* - Emits WAL.
* - Reads/writes the columnar metapage.
* - Reserves data offsets, stripe numbers, and row offsets.
* - Truncation.
*
* Higher-level columnar operations deal with logical offsets and large
* contiguous buffers of data that need to be stored. But the buffer manager
* and WAL depend on formatted pages with headers, so these large buffers need
* to be written across many pages. This module translates the contiguous
* buffers into individual block reads/writes, and performs WAL when
* necessary.
*
* Storage layout: a metapage in block 0, followed by an empty page in block
* 1, followed by logical data starting at the first byte after the page
* header in block 2 (having logical offset ColumnarFirstLogicalOffset). (XXX:
* Block 1 is left empty for no particular reason. Reconsider?). A columnar
* table should always have at least 2 blocks.
*
* Reservation is done with a relation extension lock, and designed for
* concurrency, so the callers only need an ordinary lock on the
* relation. Initializing the metapage or truncating the relation require that
* the caller holds an AccessExclusiveLock. (XXX: New reservations of data are
* aligned onto a new page for no particular reason. Reconsider?).
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "safe_lib.h"
#include "catalog/storage.h"
#include "miscadmin.h"
#include "storage/bufmgr.h"
#include "storage/lmgr.h"
#include "columnar/columnar.h"
#include "columnar/columnar_storage.h"
/*
* Content of the first page in main fork, which stores metadata at file
* level.
*/
typedef struct ColumnarMetapage
{
/*
* Store version of file format used, so we can detect files from
* previous versions if we change file format.
*/
uint32 versionMajor;
uint32 versionMinor;
/*
* Each of the metadata table rows are identified by a storageId.
* We store it also in the main fork so we can link metadata rows
* with data files.
*/
uint64 storageId;
uint64 reservedStripeId; /* first unused stripe id */
uint64 reservedRowNumber; /* first unused row number */
uint64 reservedOffset; /* first unused byte offset */
} ColumnarMetapage;
/* represents a "physical" block+offset address */
typedef struct PhysicalAddr
{
BlockNumber blockno;
uint32 offset;
} PhysicalAddr;
#define COLUMNAR_METAPAGE_BLOCKNO 0
#define COLUMNAR_EMPTY_BLOCKNO 1
#define COLUMNAR_INVALID_STRIPE_ID 0
#define COLUMNAR_FIRST_STRIPE_ID 1
#define COLUMNAR_INVALID_ROW_NUMBER 0
#define COLUMNAR_FIRST_ROW_NUMBER 1
/*
* Map logical offsets to a physical page and offset where the data is kept.
*/
static inline PhysicalAddr
LogicalToPhysical(uint64 logicalOffset)
{
PhysicalAddr addr;
addr.blockno = logicalOffset / COLUMNAR_BYTES_PER_PAGE;
addr.offset = SizeOfPageHeaderData + (logicalOffset % COLUMNAR_BYTES_PER_PAGE);
return addr;
}
/*
* Map a physical page and offset address to a logical address.
*/
static inline uint64
PhysicalToLogical(PhysicalAddr addr)
{
return COLUMNAR_BYTES_PER_PAGE * addr.blockno + addr.offset - SizeOfPageHeaderData;
}
static ColumnarMetapage ColumnarMetapageRead(Relation rel, bool force);
static void ReadFromBlock(Relation rel, BlockNumber blockno, uint32 offset,
char *buf, uint32 len, bool force);
static void WriteToBlock(Relation rel, BlockNumber blockno, uint32 offset,
char *buf, uint32 len, bool clear);
static uint64 AlignReservation(uint64 prevReservation);
static bool ColumnarMetapageIsCurrent(ColumnarMetapage *metapage);
static bool ColumnarMetapageIsOlder(ColumnarMetapage *metapage);
static bool ColumnarMetapageIsNewer(ColumnarMetapage *metapage);
static void ColumnarMetapageCheckVersion(Relation rel, ColumnarMetapage *metapage);
/*
* ColumnarStorageInit - initialize a new metapage in an empty relation
* with the given storageId.
*
* Caller must hold AccessExclusiveLock on the relation.
*/
void
ColumnarStorageInit(SMgrRelation srel, uint64 storageId)
{
BlockNumber nblocks = smgrnblocks(srel, MAIN_FORKNUM);
if (nblocks > 0)
{
elog(ERROR,
"attempted to initialize metapage, but %d pages already exist",
nblocks);
}
/* create two pages */
PGAlignedBlock block;
Page page = block.data;
/* write metapage */
PageInit(page, BLCKSZ, 0);
PageHeader phdr = (PageHeader) page;
ColumnarMetapage metapage = { 0 };
metapage.storageId = storageId;
metapage.versionMajor = COLUMNAR_VERSION_MAJOR;
metapage.versionMinor = COLUMNAR_VERSION_MINOR;
metapage.reservedStripeId = COLUMNAR_FIRST_STRIPE_ID;
metapage.reservedRowNumber = COLUMNAR_FIRST_ROW_NUMBER;
metapage.reservedOffset = ColumnarFirstLogicalOffset;
memcpy_s(page + phdr->pd_lower, phdr->pd_upper - phdr->pd_lower,
(char *) &metapage, sizeof(ColumnarMetapage));
phdr->pd_lower += sizeof(ColumnarMetapage);
PageSetChecksumInplace(page, COLUMNAR_METAPAGE_BLOCKNO);
smgrwrite(srel, MAIN_FORKNUM, COLUMNAR_METAPAGE_BLOCKNO, page, true);
log_newpage(&srel->smgr_rnode.node, MAIN_FORKNUM,
COLUMNAR_METAPAGE_BLOCKNO, page, true);
/* write empty page */
PageInit(page, BLCKSZ, 0);
PageSetChecksumInplace(page, COLUMNAR_EMPTY_BLOCKNO);
smgrwrite(srel, MAIN_FORKNUM, COLUMNAR_EMPTY_BLOCKNO, page, true);
log_newpage(&srel->smgr_rnode.node, MAIN_FORKNUM,
COLUMNAR_EMPTY_BLOCKNO, page, true);
/*
* An immediate sync is required even if we xlog'd the page, because the
* write did not go through shared_buffers and therefore a concurrent
* checkpoint may have moved the redo pointer past our xlog record.
*/
smgrimmedsync(srel, MAIN_FORKNUM);
}
/*
* ColumnarStorageUpdateCurrent - update the metapage to the current
* version. No effect if the version already matches. If 'upgrade' is true,
* throw an error if metapage version is newer; if 'upgrade' is false, it's a
* downgrade, so throw an error if the metapage version is older.
*
* NB: caller must ensure that metapage already exists, which might not be the
* case on 10.0.
*/
void
ColumnarStorageUpdateCurrent(Relation rel, bool upgrade, uint64 reservedStripeId,
uint64 reservedRowNumber, uint64 reservedOffset)
{
LockRelationForExtension(rel, ExclusiveLock);
ColumnarMetapage metapage = ColumnarMetapageRead(rel, true);
if (ColumnarMetapageIsCurrent(&metapage))
{
/* nothing to do */
return;
}
if (upgrade && ColumnarMetapageIsNewer(&metapage))
{
elog(ERROR, "found newer columnar metapage while upgrading");
}
if (!upgrade && ColumnarMetapageIsOlder(&metapage))
{
elog(ERROR, "found older columnar metapage while downgrading");
}
metapage.versionMajor = COLUMNAR_VERSION_MAJOR;
metapage.versionMinor = COLUMNAR_VERSION_MINOR;
/* storageId remains the same */
metapage.reservedStripeId = reservedStripeId;
metapage.reservedRowNumber = reservedRowNumber;
metapage.reservedOffset = reservedOffset;
WriteToBlock(rel, COLUMNAR_METAPAGE_BLOCKNO, SizeOfPageHeaderData,
(char *) &metapage, sizeof(ColumnarMetapage),
true /* clear because we are overwriting */);
UnlockRelationForExtension(rel, ExclusiveLock);
}
/*
* ColumnarStorageGetVersionMajor - return major version from the metapage.
*
* Throw an error if the metapage is not the current version, unless
* 'force' is true.
*/
uint64
ColumnarStorageGetVersionMajor(Relation rel, bool force)
{
ColumnarMetapage metapage = ColumnarMetapageRead(rel, force);
return metapage.versionMajor;
}
/*
* ColumnarStorageGetVersionMinor - return minor version from the metapage.
*
* Throw an error if the metapage is not the current version, unless
* 'force' is true.
*/
uint64
ColumnarStorageGetVersionMinor(Relation rel, bool force)
{
ColumnarMetapage metapage = ColumnarMetapageRead(rel, force);
return metapage.versionMinor;
}
/*
* ColumnarStorageGetStorageId - return storage ID from the metapage.
*
* Throw an error if the metapage is not the current version, unless
* 'force' is true.
*/
uint64
ColumnarStorageGetStorageId(Relation rel, bool force)
{
ColumnarMetapage metapage = ColumnarMetapageRead(rel, force);
return metapage.storageId;
}
/*
* ColumnarStorageGetReservedStripeId - return reserved stripe ID from the
* metapage.
*
* Throw an error if the metapage is not the current version, unless
* 'force' is true.
*/
uint64
ColumnarStorageGetReservedStripeId(Relation rel, bool force)
{
ColumnarMetapage metapage = ColumnarMetapageRead(rel, force);
return metapage.reservedStripeId;
}
/*
* ColumnarStorageGetReservedRowNumber - return reserved row number from the
* metapage.
*
* Throw an error if the metapage is not the current version, unless
* 'force' is true.
*/
uint64
ColumnarStorageGetReservedRowNumber(Relation rel, bool force)
{
ColumnarMetapage metapage = ColumnarMetapageRead(rel, force);
return metapage.reservedRowNumber;
}
/*
* ColumnarStorageGetReservedOffset - return reserved offset from the metapage.
*
* Throw an error if the metapage is not the current version, unless
* 'force' is true.
*/
uint64
ColumnarStorageGetReservedOffset(Relation rel, bool force)
{
ColumnarMetapage metapage = ColumnarMetapageRead(rel, force);
return metapage.reservedOffset;
}
/*
* ColumnarMetapageNeedsUpgrade - return true if metapage exists and is not
* the current version.
*/
bool
ColumnarStorageIsCurrent(Relation rel)
{
RelationOpenSmgr(rel);
BlockNumber nblocks = smgrnblocks(rel->rd_smgr, MAIN_FORKNUM);
if (nblocks < 2)
{
return false;
}
ColumnarMetapage metapage = ColumnarMetapageRead(rel, true);
return ColumnarMetapageIsCurrent(&metapage);
}
/*
* ColumnarStorageReserveStripe - reserve stripe ID and row numbers.
*/
uint64
ColumnarStorageReserveStripe(Relation rel, uint64 nrows, uint64 *firstRowNumber)
{
LockRelationForExtension(rel, ExclusiveLock);
ColumnarMetapage metapage = ColumnarMetapageRead(rel, false);
uint64 stripeId = metapage.reservedStripeId;
metapage.reservedStripeId++;
*firstRowNumber = metapage.reservedRowNumber;
metapage.reservedRowNumber += nrows;
WriteToBlock(rel, COLUMNAR_METAPAGE_BLOCKNO, SizeOfPageHeaderData,
(char *) &metapage, sizeof(ColumnarMetapage),
true /* clear because we are overwriting */);
UnlockRelationForExtension(rel, ExclusiveLock);
return stripeId;
}
/*
* ColumnarStorageReserveData - reserve logical data offsets for writing.
*/
uint64
ColumnarStorageReserveData(Relation rel, uint64 amount)
{
if (amount == 0)
{
return ColumnarInvalidLogicalOffset;
}
LockRelationForExtension(rel, ExclusiveLock);
ColumnarMetapage metapage = ColumnarMetapageRead(rel, false);
uint64 alignedReservation = AlignReservation(metapage.reservedOffset);
uint64 nextReservation = alignedReservation + amount;
metapage.reservedOffset = nextReservation;
/* write new reservation */
WriteToBlock(rel, COLUMNAR_METAPAGE_BLOCKNO, SizeOfPageHeaderData,
(char *) &metapage, sizeof(ColumnarMetapage),
true /* clear because we are overwriting */);
/* last used PhysicalAddr of new reservation */
PhysicalAddr final = LogicalToPhysical(nextReservation - 1);
/* extend with new pages */
RelationOpenSmgr(rel);
BlockNumber nblocks = smgrnblocks(rel->rd_smgr, MAIN_FORKNUM);
while (nblocks <= final.blockno)
{
Buffer newBuffer = ReadBuffer(rel, P_NEW);
Assert(BufferGetBlockNumber(newBuffer) == nblocks);
ReleaseBuffer(newBuffer);
nblocks++;
}
UnlockRelationForExtension(rel, ExclusiveLock);
return alignedReservation;
}
/*
* ColumnarStorageRead - map the logical offset to a block and offset, then
* read the buffer from multiple blocks if necessary.
*/
void
ColumnarStorageRead(Relation rel, uint64 logicalOffset, char *data, uint32 amount)
{
/* if there's no work to do, succeed even with invalid offset */
if (amount == 0)
{
return;
}
if (!ColumnarLogicalOffsetIsValid(logicalOffset))
{
elog(ERROR,
"attempted columnar read on relation %d from invalid logical offset: "
UINT64_FORMAT,
rel->rd_id, logicalOffset);
}
uint64 read = 0;
while (read < amount)
{
PhysicalAddr addr = LogicalToPhysical(logicalOffset + read);
uint32 to_read = Min(amount - read, BLCKSZ - addr.offset);
ReadFromBlock(rel, addr.blockno, addr.offset, data + read, to_read,
false);
read += to_read;
}
}
/*
* ColumnarStorageWrite - map the logical offset to a block and offset, then
* write the buffer across multiple blocks if necessary.
*/
void
ColumnarStorageWrite(Relation rel, uint64 logicalOffset, char *data, uint32 amount)
{
/* if there's no work to do, succeed even with invalid offset */
if (amount == 0)
{
return;
}
if (!ColumnarLogicalOffsetIsValid(logicalOffset))
{
elog(ERROR,
"attempted columnar write on relation %d to invalid logical offset: "
UINT64_FORMAT,
rel->rd_id, logicalOffset);
}
uint64 written = 0;
while (written < amount)
{
PhysicalAddr addr = LogicalToPhysical(logicalOffset + written);
uint64 to_write = Min(amount - written, BLCKSZ - addr.offset);
WriteToBlock(rel, addr.blockno, addr.offset, data + written, to_write,
false);
written += to_write;
}
}
/*
* ColumnarStorageTruncate - truncate the columnar storage such that
* newDataReservation will be the first unused logical offset available. Free
* pages at the end of the relation.
*
* Caller must hold AccessExclusiveLock on the relation.
*
* Returns true if pages were truncated; false otherwise.
*/
bool
ColumnarStorageTruncate(Relation rel, uint64 newDataReservation)
{
if (!ColumnarLogicalOffsetIsValid(newDataReservation))
{
elog(ERROR,
"attempted to truncate relation %d to invalid logical offset: " UINT64_FORMAT,
rel->rd_id, newDataReservation);
}
RelationOpenSmgr(rel);
BlockNumber old_rel_pages = smgrnblocks(rel->rd_smgr, MAIN_FORKNUM);
if (old_rel_pages == 0)
{
/* nothing to do */
return false;
}
LockRelationForExtension(rel, ExclusiveLock);
ColumnarMetapage metapage = ColumnarMetapageRead(rel, false);
if (metapage.reservedOffset < newDataReservation)
{
elog(ERROR,
"attempted to truncate relation %d to offset " UINT64_FORMAT \
" which is higher than existing offset " UINT64_FORMAT,
rel->rd_id, newDataReservation, metapage.reservedOffset);
}
if (metapage.reservedOffset == newDataReservation)
{
/* nothing to do */
UnlockRelationForExtension(rel, ExclusiveLock);
return false;
}
metapage.reservedOffset = newDataReservation;
/* write new reservation */
WriteToBlock(rel, COLUMNAR_METAPAGE_BLOCKNO, SizeOfPageHeaderData,
(char *) &metapage, sizeof(ColumnarMetapage),
true /* clear because we are overwriting */);
UnlockRelationForExtension(rel, ExclusiveLock);
PhysicalAddr final = LogicalToPhysical(newDataReservation - 1);
BlockNumber new_rel_pages = final.blockno + 1;
Assert(new_rel_pages <= old_rel_pages);
/*
* Truncate the storage. Note that RelationTruncate() takes care of
* Write Ahead Logging.
*/
if (new_rel_pages < old_rel_pages)
{
RelationTruncate(rel, new_rel_pages);
return true;
}
return false;
}
/*
* ColumnarMetapageRead - read the current contents of the metapage. Error if
* it does not exist. Throw an error if the metapage is not the current
* version, unless 'force' is true.
*
* NB: it's safe to read a different version of a metapage because we
* guarantee that fields will only be added and existing fields will never be
* changed. However, it's important that we don't depend on new fields being
* set properly when we read an old metapage; an old metapage should only be
* read for the purposes of upgrading or error checking.
*/
static ColumnarMetapage
ColumnarMetapageRead(Relation rel, bool force)
{
RelationOpenSmgr(rel);
BlockNumber nblocks = smgrnblocks(rel->rd_smgr, MAIN_FORKNUM);
if (nblocks == 0)
{
elog(ERROR, "columnar metapage for relation \"%s\" does not exist",
RelationGetRelationName(rel));
}
ColumnarMetapage metapage;
ReadFromBlock(rel, COLUMNAR_METAPAGE_BLOCKNO, SizeOfPageHeaderData,
(char *) &metapage, sizeof(ColumnarMetapage), force);
if (!force)
{
ColumnarMetapageCheckVersion(rel, &metapage);
}
return metapage;
}
/*
* ReadFromBlock - read bytes from a page at the given offset. If 'force' is
* true, don't check pd_lower; useful when reading a metapage of unknown
* version.
*/
static void
ReadFromBlock(Relation rel, BlockNumber blockno, uint32 offset, char *buf,
uint32 len, bool force)
{
Buffer buffer = ReadBuffer(rel, blockno);
Page page = BufferGetPage(buffer);
PageHeader phdr = (PageHeader) page;
if (BLCKSZ < offset + len || (!force && (phdr->pd_lower < offset + len)))
{
elog(ERROR,
"attempt to read columnar data of length %d from offset %d of block %d of relation %d",
len, offset, blockno, rel->rd_id);
}
memcpy_s(buf, len, page + offset, len);
ReleaseBuffer(buffer);
}
/*
* WriteToBlock - append data to a block, initializing if necessary, and emit
* WAL. If 'clear' is true, always clear the data on the page and reinitialize
* it first, and offset must be SizeOfPageHeaderData. Otherwise, offset must
* be equal to pd_lower and pd_lower will be set to the end of the written
* data.
*/
static void
WriteToBlock(Relation rel, BlockNumber blockno, uint32 offset, char *buf,
uint32 len, bool clear)
{
Buffer buffer = ReadBuffer(rel, blockno);
LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
Page page = BufferGetPage(buffer);
PageHeader phdr = (PageHeader) page;
if (PageIsNew(page) || clear)
{
PageInit(page, BLCKSZ, 0);
}
if (phdr->pd_lower != offset || phdr->pd_upper - offset < len)
{
elog(ERROR,
"attempt to write columnar data of length %d to offset %d of block %d of relation %d",
len, offset, blockno, rel->rd_id);
}
START_CRIT_SECTION();
memcpy_s(page + phdr->pd_lower, phdr->pd_upper - phdr->pd_lower, buf, len);
phdr->pd_lower += len;
MarkBufferDirty(buffer);
if (RelationNeedsWAL(rel))
{
XLogBeginInsert();
/*
* Since columnar will mostly write whole pages we force the transmission of the
* whole image in the buffer
*/
XLogRegisterBuffer(0, buffer, REGBUF_FORCE_IMAGE);
XLogRecPtr recptr = XLogInsert(RM_GENERIC_ID, 0);
PageSetLSN(page, recptr);
}
END_CRIT_SECTION();
UnlockReleaseBuffer(buffer);
}
/*
* AlignReservation - given an unused logical byte offset, align it so that it
* falls at the start of a page.
*
* XXX: Reconsider whether we want/need to do this at all.
*/
static uint64
AlignReservation(uint64 prevReservation)
{
PhysicalAddr prevAddr = LogicalToPhysical(prevReservation);
uint64 alignedReservation = prevReservation;
if (prevAddr.offset != SizeOfPageHeaderData)
{
/* not aligned; align on beginning of next page */
PhysicalAddr initial = { 0 };
initial.blockno = prevAddr.blockno + 1;
initial.offset = SizeOfPageHeaderData;
alignedReservation = PhysicalToLogical(initial);
}
Assert(alignedReservation >= prevReservation);
return alignedReservation;
}
/*
* ColumnarMetapageIsCurrent - is the metapage at the latest version?
*/
static bool
ColumnarMetapageIsCurrent(ColumnarMetapage *metapage)
{
return (metapage->versionMajor == COLUMNAR_VERSION_MAJOR &&
metapage->versionMinor == COLUMNAR_VERSION_MINOR);
}
/*
* ColumnarMetapageIsOlder - is the metapage older than the current version?
*/
static bool
ColumnarMetapageIsOlder(ColumnarMetapage *metapage)
{
return (metapage->versionMajor < COLUMNAR_VERSION_MAJOR ||
(metapage->versionMajor == COLUMNAR_VERSION_MAJOR &&
(int) metapage->versionMinor < (int) COLUMNAR_VERSION_MINOR));
}
/*
* ColumnarMetapageIsNewer - is the metapage newer than the current version?
*/
static bool
ColumnarMetapageIsNewer(ColumnarMetapage *metapage)
{
return (metapage->versionMajor > COLUMNAR_VERSION_MAJOR ||
(metapage->versionMajor == COLUMNAR_VERSION_MAJOR &&
metapage->versionMinor > COLUMNAR_VERSION_MINOR));
}
/*
* ColumnarMetapageCheckVersion - throw an error if accessing old
* version of metapage.
*/
static void
ColumnarMetapageCheckVersion(Relation rel, ColumnarMetapage *metapage)
{
if (!ColumnarMetapageIsCurrent(metapage))
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg(
"attempted to access relation \"%s\", which uses an older columnar format",
RelationGetRelationName(rel)),
errdetail(
"Columnar format version %d.%d is required, \"%s\" has version %d.%d.",
COLUMNAR_VERSION_MAJOR, COLUMNAR_VERSION_MINOR,
RelationGetRelationName(rel),
metapage->versionMajor, metapage->versionMinor),
errhint(
"Use VACUUM to upgrade the columnar table format version.")));
}
}

View File

@ -51,6 +51,7 @@
#include "columnar/columnar.h"
#include "columnar/columnar_customscan.h"
#include "columnar/columnar_storage.h"
#include "columnar/columnar_tableam.h"
#include "columnar/columnar_version_compat.h"
#include "distributed/commands.h"
@ -516,17 +517,24 @@ columnar_relation_set_new_filenode(Relation rel,
errmsg("unlogged columnar tables are not supported")));
}
Oid oldRelfilenode = rel->rd_node.relNode;
/*
* If existing and new relfilenode are different, that means the existing
* storage was dropped and we also need to clean up the metadata and
* state. If they are equal, this is a new relation object and we don't
* need to clean anything.
*/
if (rel->rd_node.relNode != newrnode->relNode)
{
MarkRelfilenodeDropped(rel->rd_node.relNode, GetCurrentSubTransactionId());
MarkRelfilenodeDropped(oldRelfilenode, GetCurrentSubTransactionId());
/* delete old relfilenode metadata */
DeleteMetadataRows(rel->rd_node);
DeleteMetadataRows(rel->rd_node);
}
*freezeXid = RecentXmin;
*minmulti = GetOldestMultiXactId();
SMgrRelation srel = RelationCreateStorage(*newrnode, persistence);
ColumnarStorageInit(srel, ColumnarMetadataNewStorageId());
InitColumnarOptions(rel->rd_id);
smgrclose(srel);
@ -554,7 +562,9 @@ columnar_relation_nontransactional_truncate(Relation rel)
*/
RelationTruncate(rel, 0);
/* we will lazily initialize new metadata in first stripe reservation */
uint64 storageId = ColumnarMetadataNewStorageId();
RelationOpenSmgr(rel);
ColumnarStorageInit(rel->rd_smgr, storageId);
}
@ -840,34 +850,25 @@ TruncateColumnar(Relation rel, int elevel)
return;
}
RelationOpenSmgr(rel);
BlockNumber old_rel_pages = smgrnblocks(rel->rd_smgr, MAIN_FORKNUM);
RelationCloseSmgr(rel);
/*
* Due to the AccessExclusive lock there's no danger that
* new stripes be added beyond highestPhysicalAddress while
* we're truncating.
*/
SmgrAddr highestPhysicalAddress =
logical_to_smgr(GetHighestUsedAddress(rel->rd_node));
uint64 newDataReservation = Max(GetHighestUsedAddress(rel->rd_node) + 1,
ColumnarFirstLogicalOffset);
/*
* Unlock and return if truncation won't reduce data file's size.
*/
BlockNumber new_rel_pages = Min(old_rel_pages,
highestPhysicalAddress.blockno + 1);
if (new_rel_pages == old_rel_pages)
RelationOpenSmgr(rel);
BlockNumber old_rel_pages = smgrnblocks(rel->rd_smgr, MAIN_FORKNUM);
if (!ColumnarStorageTruncate(rel, newDataReservation))
{
UnlockRelation(rel, AccessExclusiveLock);
return;
}
/*
* Truncate the storage. Note that RelationTruncate() takes care of
* Write Ahead Logging.
*/
RelationTruncate(rel, new_rel_pages);
RelationOpenSmgr(rel);
BlockNumber new_rel_pages = smgrnblocks(rel->rd_smgr, MAIN_FORKNUM);
/*
* We can release the exclusive lock as soon as we have truncated.
@ -1822,3 +1823,75 @@ alter_columnar_table_reset(PG_FUNCTION_ARGS)
PG_RETURN_VOID();
}
/*
* upgrade_columnar_storage - upgrade columnar storage to the current
* version.
*
* DDL:
* CREATE OR REPLACE FUNCTION upgrade_columnar_storage(rel regclass)
* RETURNS VOID
* STRICT
* LANGUAGE c AS 'MODULE_PATHNAME', 'upgrade_columnar_storage';
*/
PG_FUNCTION_INFO_V1(upgrade_columnar_storage);
Datum
upgrade_columnar_storage(PG_FUNCTION_ARGS)
{
Oid relid = PG_GETARG_OID(0);
/*
* ACCESS EXCLUSIVE LOCK is not required by the low-level routines, so we
* can take only an ACCESS SHARE LOCK. But all access to non-current
* columnar tables will fail anyway, so it's better to take ACCESS
* EXLUSIVE LOCK now.
*/
Relation rel = table_open(relid, AccessExclusiveLock);
if (!IsColumnarTableAmTable(relid))
{
ereport(ERROR, (errmsg("table %s is not a columnar table",
quote_identifier(RelationGetRelationName(rel)))));
}
ColumnarStorageUpdateIfNeeded(rel, true);
table_close(rel, AccessExclusiveLock);
PG_RETURN_VOID();
}
/*
* downgrade_columnar_storage - downgrade columnar storage to the
* current version.
*
* DDL:
* CREATE OR REPLACE FUNCTION downgrade_columnar_storage(rel regclass)
* RETURNS VOID
* STRICT
* LANGUAGE c AS 'MODULE_PATHNAME', 'downgrade_columnar_storage';
*/
PG_FUNCTION_INFO_V1(downgrade_columnar_storage);
Datum
downgrade_columnar_storage(PG_FUNCTION_ARGS)
{
Oid relid = PG_GETARG_OID(0);
/*
* ACCESS EXCLUSIVE LOCK is not required by the low-level routines, so we
* can take only an ACCESS SHARE LOCK. But all access to non-current
* columnar tables will fail anyway, so it's better to take ACCESS
* EXLUSIVE LOCK now.
*/
Relation rel = table_open(relid, AccessExclusiveLock);
if (!IsColumnarTableAmTable(relid))
{
ereport(ERROR, (errmsg("table %s is not a columnar table",
quote_identifier(RelationGetRelationName(rel)))));
}
ColumnarStorageUpdateIfNeeded(rel, false);
table_close(rel, AccessExclusiveLock);
PG_RETURN_VOID();
}

View File

@ -30,6 +30,7 @@
#include "utils/relfilenodemap.h"
#include "columnar/columnar.h"
#include "columnar/columnar_storage.h"
#include "columnar/columnar_version_compat.h"
struct ColumnarWriteState
@ -351,80 +352,6 @@ CreateEmptyStripeSkipList(uint32 stripeMaxRowCount, uint32 chunkRowCount,
}
void
WriteToSmgr(Relation rel, uint64 logicalOffset, char *data, uint32 dataLength)
{
uint64 remaining = dataLength;
Buffer buffer;
while (remaining > 0)
{
SmgrAddr addr = logical_to_smgr(logicalOffset);
RelationOpenSmgr(rel);
BlockNumber nblocks PG_USED_FOR_ASSERTS_ONLY =
smgrnblocks(rel->rd_smgr, MAIN_FORKNUM);
Assert(addr.blockno < nblocks);
RelationCloseSmgr(rel);
buffer = ReadBuffer(rel, addr.blockno);
LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
Page page = BufferGetPage(buffer);
PageHeader phdr = (PageHeader) page;
if (PageIsNew(page))
{
PageInit(page, BLCKSZ, 0);
}
/*
* After a transaction has been rolled-back, we might be
* over-writing the rolledback write, so phdr->pd_lower can be
* different from addr.offset.
*
* We reset pd_lower to reset the rolledback write.
*/
if (phdr->pd_lower > addr.offset)
{
ereport(DEBUG1, (errmsg("over-writing page %u", addr.blockno),
errdetail("This can happen after a roll-back.")));
phdr->pd_lower = addr.offset;
}
Assert(phdr->pd_lower == addr.offset);
START_CRIT_SECTION();
uint64 to_write = Min(phdr->pd_upper - phdr->pd_lower, remaining);
memcpy_s(page + phdr->pd_lower, phdr->pd_upper - phdr->pd_lower, data, to_write);
phdr->pd_lower += to_write;
MarkBufferDirty(buffer);
if (RelationNeedsWAL(rel))
{
XLogBeginInsert();
/*
* Since columnar will mostly write whole pages we force the transmission of the
* whole image in the buffer
*/
XLogRegisterBuffer(0, buffer, REGBUF_FORCE_IMAGE);
XLogRecPtr recptr = XLogInsert(RM_GENERIC_ID, 0);
PageSetLSN(page, recptr);
}
END_CRIT_SECTION();
UnlockReleaseBuffer(buffer);
data += to_write;
remaining -= to_write;
logicalOffset += to_write;
}
}
/*
* FlushStripe flushes current stripe data into the file. The function first ensures
* the last data chunk for each column is properly serialized and compressed. Then,
@ -527,8 +454,8 @@ FlushStripe(ColumnarWriteState *writeState)
columnBuffers->chunkBuffersArray[chunkIndex];
StringInfo existsBuffer = chunkBuffers->existsBuffer;
WriteToSmgr(relation, currentFileOffset,
existsBuffer->data, existsBuffer->len);
ColumnarStorageWrite(relation, currentFileOffset,
existsBuffer->data, existsBuffer->len);
currentFileOffset += existsBuffer->len;
}
@ -538,8 +465,8 @@ FlushStripe(ColumnarWriteState *writeState)
columnBuffers->chunkBuffersArray[chunkIndex];
StringInfo valueBuffer = chunkBuffers->valueBuffer;
WriteToSmgr(relation, currentFileOffset,
valueBuffer->data, valueBuffer->len);
ColumnarStorageWrite(relation, currentFileOffset,
valueBuffer->data, valueBuffer->len);
currentFileOffset += valueBuffer->len;
}
}

View File

@ -20,3 +20,10 @@ END$proc$;
-- since we dropped pg11 support, we don't need to worry about missing
-- columnar objects when upgrading postgres
DROP FUNCTION citus_internal.columnar_ensure_objects_exist();
#include "udfs/upgrade_columnar_storage/10.1-1.sql"
#include "udfs/downgrade_columnar_storage/10.1-1.sql"
-- upgrade storage for all columnar relations
SELECT citus_internal.upgrade_columnar_storage(c.oid) FROM pg_class c, pg_am a
WHERE c.relam = a.oid AND amname = 'columnar';

View File

@ -11,3 +11,10 @@ REFERENCES columnar.stripe(storage_id, stripe_num) ON DELETE CASCADE;
-- define columnar_ensure_objects_exist again
#include "../udfs/columnar_ensure_objects_exist/10.0-1.sql"
-- upgrade storage for all columnar relations
SELECT citus_internal.downgrade_columnar_storage(c.oid) FROM pg_class c, pg_am a
WHERE c.relam = a.oid AND amname = 'columnar';
DROP FUNCTION citus_internal.upgrade_columnar_storage(regclass);
DROP FUNCTION citus_internal.downgrade_columnar_storage(regclass);

View File

@ -0,0 +1,7 @@
CREATE OR REPLACE FUNCTION citus_internal.downgrade_columnar_storage(rel regclass)
RETURNS VOID
STRICT
LANGUAGE c AS 'MODULE_PATHNAME', $$downgrade_columnar_storage$$;
COMMENT ON FUNCTION citus_internal.downgrade_columnar_storage(regclass)
IS 'function to downgrade the columnar storage, if necessary';

View File

@ -0,0 +1,7 @@
CREATE OR REPLACE FUNCTION citus_internal.downgrade_columnar_storage(rel regclass)
RETURNS VOID
STRICT
LANGUAGE c AS 'MODULE_PATHNAME', $$downgrade_columnar_storage$$;
COMMENT ON FUNCTION citus_internal.downgrade_columnar_storage(regclass)
IS 'function to downgrade the columnar storage, if necessary';

View File

@ -0,0 +1,7 @@
CREATE OR REPLACE FUNCTION citus_internal.upgrade_columnar_storage(rel regclass)
RETURNS VOID
STRICT
LANGUAGE c AS 'MODULE_PATHNAME', $$upgrade_columnar_storage$$;
COMMENT ON FUNCTION citus_internal.upgrade_columnar_storage(regclass)
IS 'function to upgrade the columnar storage, if necessary';

View File

@ -0,0 +1,7 @@
CREATE OR REPLACE FUNCTION citus_internal.upgrade_columnar_storage(rel regclass)
RETURNS VOID
STRICT
LANGUAGE c AS 'MODULE_PATHNAME', $$upgrade_columnar_storage$$;
COMMENT ON FUNCTION citus_internal.upgrade_columnar_storage(regclass)
IS 'function to upgrade the columnar storage, if necessary';

View File

@ -25,7 +25,6 @@
#include "distributed/lock_graph.h"
#include "distributed/metadata_cache.h"
#include "distributed/remote_commands.h"
#include "distributed/resource_lock.h"
#include "distributed/tuplestore.h"
#include "storage/proc.h"
#include "utils/builtins.h"
@ -472,18 +471,9 @@ IsProcessWaitingForSafeOperations(PGPROC *proc)
PROCLOCK *waitProcLock = proc->waitProcLock;
LOCK *waitLock = waitProcLock->tag.myLock;
/*
* Stripe reservation locks are temporary & don't hold until end of
* transaction, so we shouldn't include them in the lock graph.
*/
bool stripeReservationLock =
waitLock->tag.locktag_type == LOCKTAG_ADVISORY &&
waitLock->tag.locktag_field4 == ADV_LOCKTAG_CLASS_COLUMNAR_STRIPE_RESERVATION;
return waitLock->tag.locktag_type == LOCKTAG_RELATION_EXTEND ||
waitLock->tag.locktag_type == LOCKTAG_PAGE ||
waitLock->tag.locktag_type == LOCKTAG_SPECULATIVE_TOKEN ||
stripeReservationLock;
waitLock->tag.locktag_type == LOCKTAG_SPECULATIVE_TOKEN;
}

View File

@ -39,8 +39,8 @@
#define COMPRESSION_LEVEL_MAX 19
/* Columnar file signature */
#define COLUMNAR_VERSION_MAJOR 1
#define COLUMNAR_VERSION_MINOR 7
#define COLUMNAR_VERSION_MAJOR 2
#define COLUMNAR_VERSION_MINOR 0
/* miscellaneous defines */
#define COLUMNAR_TUPLE_COST_MULTIPLIER 10
@ -233,13 +233,11 @@ extern void InitColumnarOptions(Oid regclass);
extern void SetColumnarOptions(Oid regclass, ColumnarOptions *options);
extern bool DeleteColumnarTableOptions(Oid regclass, bool missingOk);
extern bool ReadColumnarOptions(Oid regclass, ColumnarOptions *options);
extern void WriteToSmgr(Relation relation, uint64 logicalOffset,
char *data, uint32 dataLength);
extern StringInfo ReadFromSmgr(Relation rel, uint64 offset, uint32 size);
extern bool IsColumnarTableAmTable(Oid relationId);
/* columnar_metadata_tables.c */
extern void DeleteMetadataRows(RelFileNode relfilenode);
extern uint64 ColumnarMetadataNewStorageId(void);
extern uint64 GetHighestUsedAddress(RelFileNode relfilenode);
extern StripeMetadata ReserveStripe(Relation rel, uint64 size,
uint64 rowCount, uint64 columnCount,
@ -271,51 +269,5 @@ extern bool PendingWritesInUpperTransactions(Oid relfilenode,
SubTransactionId currentSubXid);
extern MemoryContext GetWriteContextForDebug(void);
typedef struct SmgrAddr
{
BlockNumber blockno;
uint32 offset;
} SmgrAddr;
/*
* Map logical offsets (as tracked in the metadata) to a physical page and
* offset where the data is kept.
*/
static inline SmgrAddr
logical_to_smgr(uint64 logicalOffset)
{
SmgrAddr addr;
addr.blockno = logicalOffset / COLUMNAR_BYTES_PER_PAGE;
addr.offset = SizeOfPageHeaderData + (logicalOffset % COLUMNAR_BYTES_PER_PAGE);
return addr;
}
/*
* Map a physical page adnd offset address to a logical address.
*/
static inline uint64
smgr_to_logical(SmgrAddr addr)
{
return COLUMNAR_BYTES_PER_PAGE * addr.blockno + addr.offset - SizeOfPageHeaderData;
}
/*
* Get the first usable address of next block.
*/
static inline SmgrAddr
next_block_start(SmgrAddr addr)
{
SmgrAddr result = {
.blockno = addr.blockno + 1,
.offset = SizeOfPageHeaderData
};
return result;
}
#endif /* COLUMNAR_H */

View File

@ -28,5 +28,6 @@ typedef struct StripeMetadata
} StripeMetadata;
extern List * StripesForRelfilenode(RelFileNode relfilenode);
extern void ColumnarStorageUpdateIfNeeded(Relation rel, bool isUpgrade);
#endif /* COLUMNAR_METADATA_H */

View File

@ -0,0 +1,54 @@
/*-------------------------------------------------------------------------
*
* columnar_storage.h
*
* Type and function declarations for storage of columnar data in blocks.
*
* Copyright (c) Citus Data, Inc.
*
*-------------------------------------------------------------------------
*/
#ifndef COLUMNAR_STORAGE_H
#define COLUMNAR_STORAGE_H
#include "postgres.h"
#include "storage/smgr.h"
#include "utils/rel.h"
/*
* Logical offsets never fall on the first two physical pages. See
* comments in columnar_storage.c.
*/
#define ColumnarInvalidLogicalOffset 0
#define ColumnarFirstLogicalOffset ((BLCKSZ - SizeOfPageHeaderData) * 2)
#define ColumnarLogicalOffsetIsValid(X) ((X) >= ColumnarFirstLogicalOffset)
extern void ColumnarStorageInit(SMgrRelation srel, uint64 storageId);
extern bool ColumnarStorageIsCurrent(Relation rel);
extern void ColumnarStorageUpdateCurrent(Relation rel, bool upgrade,
uint64 reservedStripeId,
uint64 reservedRowNumber,
uint64 reservedOffset);
extern uint64 ColumnarStorageGetVersionMajor(Relation rel, bool force);
extern uint64 ColumnarStorageGetVersionMinor(Relation rel, bool force);
extern uint64 ColumnarStorageGetStorageId(Relation rel, bool force);
extern uint64 ColumnarStorageGetReservedStripeId(Relation rel, bool force);
extern uint64 ColumnarStorageGetReservedRowNumber(Relation rel, bool force);
extern uint64 ColumnarStorageGetReservedOffset(Relation rel, bool force);
extern uint64 ColumnarStorageReserveData(Relation rel, uint64 amount);
extern uint64 ColumnarStorageReserveStripe(Relation rel, uint64 nrows,
uint64 *firstRowNumber);
extern void ColumnarStorageRead(Relation rel, uint64 logicalOffset,
char *data, uint32 amount);
extern void ColumnarStorageWrite(Relation rel, uint64 logicalOffset,
char *data, uint32 amount);
extern bool ColumnarStorageTruncate(Relation rel, uint64 newDataReservation);
#endif /* COLUMNAR_STORAGE_H */

View File

@ -39,10 +39,7 @@ typedef enum AdvisoryLocktagClass
ADV_LOCKTAG_CLASS_CITUS_REBALANCE_COLOCATION = 7,
ADV_LOCKTAG_CLASS_CITUS_COLOCATED_SHARDS_METADATA = 8,
ADV_LOCKTAG_CLASS_CITUS_OPERATIONS = 9,
ADV_LOCKTAG_CLASS_CITUS_PLACEMENT_CLEANUP = 10,
/* Columnar lock types */
ADV_LOCKTAG_CLASS_COLUMNAR_STRIPE_RESERVATION = 11
ADV_LOCKTAG_CLASS_CITUS_PLACEMENT_CLEANUP = 10
} AdvisoryLocktagClass;
/* CitusOperations has constants for citus operations */
@ -102,13 +99,6 @@ typedef enum CitusOperations
(uint32) operationId, \
ADV_LOCKTAG_CLASS_CITUS_OPERATIONS)
#define SET_LOCKTAG_COLUMNAR_STRIPE_RESERVATION(tag, relation) \
SET_LOCKTAG_ADVISORY(tag, \
relation->rd_lockInfo.lockRelId.dbId, \
relation->rd_lockInfo.lockRelId.relId, \
0, \
ADV_LOCKTAG_CLASS_COLUMNAR_STRIPE_RESERVATION)
/* reuse advisory lock, but with different, unused field 4 (10)
* Also it has the database hardcoded to MyDatabaseId, to ensure the locks
* are local to each database */

View File

@ -3,3 +3,4 @@
test: upgrade_basic_after
test: upgrade_partition_constraints_after
test: upgrade_pg_dist_object_test_after
test: upgrade_columnar_metapage_after

View File

@ -3,3 +3,4 @@
test: upgrade_basic_before
test: upgrade_partition_constraints_before
test: upgrade_pg_dist_object_test_before
test: upgrade_columnar_metapage_before

View File

@ -12,6 +12,14 @@ WITH sample_data AS (VALUES
INSERT INTO test_alter_table SELECT * FROM sample_data;
-- drop a column
ALTER TABLE test_alter_table DROP COLUMN a;
select
version_major, version_minor, reserved_stripe_id, reserved_row_number, reserved_offset
from columnar_test_helpers.columnar_storage_info('test_alter_table');
version_major | version_minor | reserved_stripe_id | reserved_row_number | reserved_offset
---------------------------------------------------------------------
2 | 0 | 2 | 1 | 16402
(1 row)
-- test analyze
ANALYZE test_alter_table;
-- verify select queries run as expected
@ -59,6 +67,14 @@ SELECT * FROM test_alter_table;
3 | 5 | 8
(5 rows)
select
version_major, version_minor, reserved_stripe_id, reserved_row_number, reserved_offset
from columnar_test_helpers.columnar_storage_info('test_alter_table');
version_major | version_minor | reserved_stripe_id | reserved_row_number | reserved_offset
---------------------------------------------------------------------
2 | 0 | 4 | 1 | 32724
(1 row)
-- add a fixed-length column with default value
ALTER TABLE test_alter_table ADD COLUMN e int default 3;
SELECT * from test_alter_table;
@ -83,6 +99,14 @@ SELECT * from test_alter_table;
1 | 2 | 4 | 8
(6 rows)
select
version_major, version_minor, reserved_stripe_id, reserved_row_number, reserved_offset
from columnar_test_helpers.columnar_storage_info('test_alter_table');
version_major | version_minor | reserved_stripe_id | reserved_row_number | reserved_offset
---------------------------------------------------------------------
2 | 0 | 5 | 1 | 40906
(1 row)
-- add a variable-length column with default value
ALTER TABLE test_alter_table ADD COLUMN f text DEFAULT 'TEXT ME';
SELECT * from test_alter_table;

View File

@ -5,11 +5,16 @@
CREATE TABLE contestant (handle TEXT, birthdate DATE, rating INT,
percentile FLOAT, country CHAR(3), achievements TEXT[])
USING columnar;
SELECT alter_columnar_table_set('contestant', compression => 'none');
alter_columnar_table_set
---------------------------------------------------------------------
(1 row)
-- should fail
CREATE INDEX contestant_idx on contestant(handle);
ERROR: indexes not supported for columnar tables
-- Create compressed table with automatically determined file path
-- COMPRESSED
-- Create zstd compressed table
CREATE TABLE contestant_compressed (handle TEXT, birthdate DATE, rating INT,
percentile FLOAT, country CHAR(3), achievements TEXT[])
USING columnar;

View File

@ -26,7 +26,7 @@ SELECT alter_columnar_table_set('t_compressed', chunk_group_row_limit => 100);
SELECT * FROM columnar.options WHERE regclass = 't_compressed'::regclass;
regclass | chunk_group_row_limit | stripe_row_limit | compression_level | compression
---------------------------------------------------------------------
t_compressed | 100 | 100 | 3 | pglz
t_compressed | 100 | 100 | 3 | pglz
(1 row)
-- select
@ -52,6 +52,23 @@ select count(*) from t_compressed;
0
(1 row)
-- check storage
select
version_major, version_minor, reserved_stripe_id, reserved_row_number, reserved_offset
from columnar_test_helpers.columnar_storage_info('t_compressed');
version_major | version_minor | reserved_stripe_id | reserved_row_number | reserved_offset
---------------------------------------------------------------------
2 | 0 | 1 | 1 | 16336
(1 row)
select
version_major, version_minor, reserved_stripe_id, reserved_row_number, reserved_offset
from columnar_test_helpers.columnar_storage_info('t_uncompressed');
version_major | version_minor | reserved_stripe_id | reserved_row_number | reserved_offset
---------------------------------------------------------------------
2 | 0 | 1 | 1 | 16336
(1 row)
-- explain
explain (costs off, summary off, timing off) select * from t_uncompressed;
QUERY PLAN
@ -68,16 +85,16 @@ explain (costs off, summary off, timing off) select * from t_compressed;
-- vacuum
vacuum verbose t_compressed;
INFO: statistics for "t_compressed":
storage id: -1
total file size: 0, total data size: 0
storage id: xxxxx
total file size: 16384, total data size: 0
compression rate: 1.00x
total row count: 0, stripe count: 0, average rows per stripe: 0
chunk count: 0, containing data for dropped columns: 0
vacuum verbose t_uncompressed;
INFO: statistics for "t_uncompressed":
storage id: -1
total file size: 0, total data size: 0
storage id: xxxxx
total file size: 16384, total data size: 0
compression rate: 1.00x
total row count: 0, stripe count: 0, average rows per stripe: 0
chunk count: 0, containing data for dropped columns: 0
@ -85,6 +102,23 @@ chunk count: 0, containing data for dropped columns: 0
-- vacuum full
vacuum full t_compressed;
vacuum full t_uncompressed;
-- check storage
select
version_major, version_minor, reserved_stripe_id, reserved_row_number, reserved_offset
from columnar_test_helpers.columnar_storage_info('t_compressed');
version_major | version_minor | reserved_stripe_id | reserved_row_number | reserved_offset
---------------------------------------------------------------------
2 | 0 | 1 | 1 | 16336
(1 row)
select
version_major, version_minor, reserved_stripe_id, reserved_row_number, reserved_offset
from columnar_test_helpers.columnar_storage_info('t_uncompressed');
version_major | version_minor | reserved_stripe_id | reserved_row_number | reserved_offset
---------------------------------------------------------------------
2 | 0 | 1 | 1 | 16336
(1 row)
-- analyze
analyze t_uncompressed;
analyze t_compressed;
@ -94,6 +128,23 @@ truncate t_compressed;
-- alter type
alter table t_uncompressed alter column a type text;
alter table t_compressed alter column a type text;
-- check storage
select
version_major, version_minor, reserved_stripe_id, reserved_row_number, reserved_offset
from columnar_test_helpers.columnar_storage_info('t_compressed');
version_major | version_minor | reserved_stripe_id | reserved_row_number | reserved_offset
---------------------------------------------------------------------
2 | 0 | 1 | 1 | 16336
(1 row)
select
version_major, version_minor, reserved_stripe_id, reserved_row_number, reserved_offset
from columnar_test_helpers.columnar_storage_info('t_uncompressed');
version_major | version_minor | reserved_stripe_id | reserved_row_number | reserved_offset
---------------------------------------------------------------------
2 | 0 | 1 | 1 | 16336
(1 row)
-- verify cost of scanning an empty table is zero, not NaN
explain table t_uncompressed;
QUERY PLAN

View File

@ -45,6 +45,14 @@ select count(*) from test_insert_command;
3
(1 row)
select
version_major, version_minor, reserved_stripe_id, reserved_row_number, reserved_offset
from columnar_test_helpers.columnar_storage_info('test_insert_command');
version_major | version_minor | reserved_stripe_id | reserved_row_number | reserved_offset
---------------------------------------------------------------------
2 | 0 | 4 | 1 | 32686
(1 row)
SELECT * FROM columnar_test_helpers.chunk_group_consistency;
consistent
---------------------------------------------------------------------
@ -141,6 +149,14 @@ FROM test_toast_columnar;
5004 | 5004 | 5004 | 5004
(1 row)
select
version_major, version_minor, reserved_stripe_id, reserved_row_number, reserved_offset
from columnar_test_helpers.columnar_storage_info('test_toast_columnar');
version_major | version_minor | reserved_stripe_id | reserved_row_number | reserved_offset
---------------------------------------------------------------------
2 | 0 | 2 | 1 | 16428
(1 row)
SELECT * FROM columnar_test_helpers.chunk_group_consistency;
consistent
---------------------------------------------------------------------
@ -173,6 +189,14 @@ INSERT INTO zero_col_heap SELECT * FROM zero_col_heap;
INSERT INTO zero_col_heap SELECT * FROM zero_col_heap;
INSERT INTO zero_col_heap SELECT * FROM zero_col_heap;
INSERT INTO zero_col SELECT * FROM zero_col_heap;
select
version_major, version_minor, reserved_stripe_id, reserved_row_number, reserved_offset
from columnar_test_helpers.columnar_storage_info('zero_col');
version_major | version_minor | reserved_stripe_id | reserved_row_number | reserved_offset
---------------------------------------------------------------------
2 | 0 | 6 | 1 | 16336
(1 row)
SELECT relname, stripe_num, chunk_group_count, row_count FROM columnar.stripe a, pg_class b
WHERE columnar_test_helpers.columnar_relation_storageid(b.oid)=a.storage_id AND relname = 'zero_col'
ORDER BY 1,2,3,4;

View File

@ -14,6 +14,14 @@ SELECT count(*) FROM t;
0
(1 row)
select
version_major, version_minor, reserved_stripe_id, reserved_row_number, reserved_offset
from columnar_test_helpers.columnar_storage_info('t');
version_major | version_minor | reserved_stripe_id | reserved_row_number | reserved_offset
---------------------------------------------------------------------
2 | 0 | 1 | 1 | 16336
(1 row)
-- check stripe metadata also have been rolled-back
SELECT count(*) FROM t_stripes;
count
@ -46,6 +54,14 @@ SELECT count(*) FROM t; -- force flush
SAVEPOINT s1;
INSERT INTO t SELECT i, i+1 FROM generate_series(1, 10) i;
select
version_major, version_minor, reserved_stripe_id, reserved_row_number, reserved_offset
from columnar_test_helpers.columnar_storage_info('t');
version_major | version_minor | reserved_stripe_id | reserved_row_number | reserved_offset
---------------------------------------------------------------------
2 | 0 | 3 | 1 | 24606
(1 row)
SELECT count(*) FROM t;
count
---------------------------------------------------------------------
@ -68,6 +84,14 @@ SELECT count(*) FROM t;
INSERT INTO t SELECT i, i+1 FROM generate_series(1, 10) i;
COMMIT;
select
version_major, version_minor, reserved_stripe_id, reserved_row_number, reserved_offset
from columnar_test_helpers.columnar_storage_info('t');
version_major | version_minor | reserved_stripe_id | reserved_row_number | reserved_offset
---------------------------------------------------------------------
2 | 0 | 5 | 1 | 40942
(1 row)
SELECT count(*) FROM t;
count
---------------------------------------------------------------------

View File

@ -3,6 +3,16 @@ SET search_path TO columnar_test_helpers;
CREATE FUNCTION columnar_relation_storageid(relid oid) RETURNS bigint
LANGUAGE C STABLE STRICT
AS 'citus', $$columnar_relation_storageid$$;
CREATE OR REPLACE FUNCTION columnar_storage_info(
rel regclass,
version_major OUT int4,
version_minor OUT int4,
storage_id OUT int8,
reserved_stripe_id OUT int8,
reserved_row_number OUT int8,
reserved_offset OUT int8)
STRICT
LANGUAGE c AS 'citus', $$columnar_storage_info$$;
CREATE FUNCTION compression_type_supported(type text) RETURNS boolean
AS $$
BEGIN

View File

@ -43,7 +43,23 @@ SELECT * FROM columnar_test_helpers.chunk_group_consistency;
t
(1 row)
select
version_major, version_minor, reserved_stripe_id, reserved_row_number, reserved_offset
from columnar_test_helpers.columnar_storage_info('columnar_truncate_test');
version_major | version_minor | reserved_stripe_id | reserved_row_number | reserved_offset
---------------------------------------------------------------------
2 | 0 | 2 | 1 | 16438
(1 row)
TRUNCATE TABLE columnar_truncate_test;
select
version_major, version_minor, reserved_stripe_id, reserved_row_number, reserved_offset
from columnar_test_helpers.columnar_storage_info('columnar_truncate_test');
version_major | version_minor | reserved_stripe_id | reserved_row_number | reserved_offset
---------------------------------------------------------------------
2 | 0 | 1 | 1 | 16336
(1 row)
SELECT * FROM columnar_test_helpers.chunk_group_consistency;
consistent
---------------------------------------------------------------------
@ -77,7 +93,7 @@ SELECT count(*) FROM columnar_truncate_test_compressed;
SELECT pg_relation_size('columnar_truncate_test_compressed');
pg_relation_size
---------------------------------------------------------------------
0
16384
(1 row)
INSERT INTO columnar_truncate_test select a, a from generate_series(1, 10) a;

View File

@ -25,6 +25,14 @@ SELECT count(*) FROM t_stripes;
3
(1 row)
select
version_major, version_minor, reserved_stripe_id, reserved_row_number, reserved_offset
from columnar_test_helpers.columnar_storage_info('t');
version_major | version_minor | reserved_stripe_id | reserved_row_number | reserved_offset
---------------------------------------------------------------------
2 | 0 | 4 | 1 | 32756
(1 row)
-- vacuum full should merge stripes together
VACUUM FULL t;
SELECT * FROM columnar_test_helpers.chunk_group_consistency;
@ -45,6 +53,14 @@ SELECT count(*) FROM t_stripes;
1
(1 row)
select
version_major, version_minor, reserved_stripe_id, reserved_row_number, reserved_offset
from columnar_test_helpers.columnar_storage_info('t');
version_major | version_minor | reserved_stripe_id | reserved_row_number | reserved_offset
---------------------------------------------------------------------
2 | 0 | 2 | 1 | 16584
(1 row)
-- test the case when all data cannot fit into a single stripe
SELECT alter_columnar_table_set('t', stripe_row_limit => 1000);
alter_columnar_table_set
@ -66,6 +82,14 @@ SELECT count(*) FROM t_stripes;
(1 row)
VACUUM FULL t;
select
version_major, version_minor, reserved_stripe_id, reserved_row_number, reserved_offset
from columnar_test_helpers.columnar_storage_info('t');
version_major | version_minor | reserved_stripe_id | reserved_row_number | reserved_offset
---------------------------------------------------------------------
2 | 0 | 4 | 1 | 53382
(1 row)
SELECT * FROM columnar_test_helpers.chunk_group_consistency;
consistent
---------------------------------------------------------------------
@ -215,6 +239,14 @@ compression rate: 1.25x
total row count: 5530, stripe count: 5, average rows per stripe: 1106
chunk count: 7, containing data for dropped columns: 0, none compressed: 5, pglz compressed: 2
select
version_major, version_minor, reserved_stripe_id, reserved_row_number, reserved_offset
from columnar_test_helpers.columnar_storage_info('t');
version_major | version_minor | reserved_stripe_id | reserved_row_number | reserved_offset
---------------------------------------------------------------------
2 | 0 | 16 | 1 | 50686
(1 row)
SELECT * FROM columnar_test_helpers.chunk_group_consistency;
consistent
---------------------------------------------------------------------

View File

@ -564,12 +564,14 @@ SELECT * FROM print_extension_changes();
---------------------------------------------------------------------
function citus_internal.columnar_ensure_objects_exist() |
function create_distributed_table(regclass,text,citus.distribution_type,text) |
| function citus_internal.downgrade_columnar_storage(regclass)
| function citus_internal.upgrade_columnar_storage(regclass)
| function citus_local_disk_space_stats()
| function create_distributed_table(regclass,text,citus.distribution_type,text,integer)
| function worker_partitioned_relation_size(regclass)
| function worker_partitioned_relation_total_size(regclass)
| function worker_partitioned_table_size(regclass)
(7 rows)
(9 rows)
DROP TABLE prev_objects, extension_diff;
-- show running version

View File

@ -0,0 +1,68 @@
\set upgrade_test_old_citus_version `echo "$upgrade_test_old_citus_version"`
SELECT substring(:'upgrade_test_old_citus_version', 'v(\d+)\.\d+\.\d+')::int >= 10 AND
substring(:'upgrade_test_old_citus_version', 'v\d+\.(\d+)\.\d+')::int >= 0
AS upgrade_test_old_citus_version_ge_10_0;
upgrade_test_old_citus_version_ge_10_0
---------------------------------------------------------------------
t
(1 row)
\gset
\if :upgrade_test_old_citus_version_ge_10_0
\else
\q
\endif
-- it's not the best practice to define this here, but we don't want to include
-- columnar_test_helpers in upgrade test schedule
CREATE OR REPLACE FUNCTION columnar_storage_info(
rel regclass,
version_major OUT int4,
version_minor OUT int4,
storage_id OUT int8,
reserved_stripe_id OUT int8,
reserved_row_number OUT int8,
reserved_offset OUT int8)
STRICT
LANGUAGE c AS 'citus', 'columnar_storage_info';
SET search_path TO upgrade_columnar_metapage, public;
-- should work since we upgrade metapages when upgrading schema version
INSERT INTO columnar_table_1 VALUES (3);
-- show that all columnar relation's metapage's are upgraded to "2.0"
SELECT count(*)=0
FROM (SELECT (columnar_storage_info(c.oid)).* t
FROM pg_class c, pg_am a
WHERE c.relam = a.oid AND amname = 'columnar') t
WHERE t.version_major != 2 and t.version_minor != 0;
?column?
---------------------------------------------------------------------
t
(1 row)
-- print metapage for two of the tables
SELECT columnar_storage_info('columnar_table_1');
columnar_storage_info
---------------------------------------------------------------------
(2,0,10000000000,4,0,481936)
(1 row)
SELECT columnar_storage_info('columnar_table_2');
columnar_storage_info
---------------------------------------------------------------------
(2,0,10000000001,2,0,16350)
(1 row)
-- table is already upgraded, make sure that upgrade_columnar_metapage is no-op
SELECT citus_internal.upgrade_columnar_storage(c.oid)
FROM pg_class c, pg_am a
WHERE c.relam = a.oid AND amname = 'columnar' and relname = 'columnar_table_2';
upgrade_columnar_storage
---------------------------------------------------------------------
(1 row)
SELECT columnar_storage_info('columnar_table_2');
columnar_storage_info
---------------------------------------------------------------------
(2,0,10000000001,2,0,16350)
(1 row)

View File

@ -0,0 +1,13 @@
\set upgrade_test_old_citus_version `echo "$upgrade_test_old_citus_version"`
SELECT substring(:'upgrade_test_old_citus_version', 'v(\d+)\.\d+\.\d+')::int >= 10 AND
substring(:'upgrade_test_old_citus_version', 'v\d+\.(\d+)\.\d+')::int >= 0
AS upgrade_test_old_citus_version_ge_10_0;
upgrade_test_old_citus_version_ge_10_0
---------------------------------------------------------------------
f
(1 row)
\gset
\if :upgrade_test_old_citus_version_ge_10_0
\else
\q

View File

@ -0,0 +1,23 @@
\set upgrade_test_old_citus_version `echo "$upgrade_test_old_citus_version"`
SELECT substring(:'upgrade_test_old_citus_version', 'v(\d+)\.\d+\.\d+')::int >= 10 AND
substring(:'upgrade_test_old_citus_version', 'v\d+\.(\d+)\.\d+')::int >= 0
AS upgrade_test_old_citus_version_ge_10_0;
upgrade_test_old_citus_version_ge_10_0
---------------------------------------------------------------------
t
(1 row)
\gset
\if :upgrade_test_old_citus_version_ge_10_0
\else
\q
\endif
CREATE SCHEMA upgrade_columnar_metapage;
SET search_path TO upgrade_columnar_metapage, public;
CREATE TABLE columnar_table_1(a INT, b INT) USING columnar;
INSERT INTO columnar_table_1 SELECT i FROM generate_series(160001, 320000) i;
CREATE TABLE columnar_table_2(b INT) USING columnar;
INSERT INTO columnar_table_2 VALUES (160);
CREATE TABLE columnar_table_3(b INT) USING columnar;
INSERT INTO columnar_table_3 VALUES (1), (2);
CREATE TABLE no_data_columnar_table(a INT, b INT, c TEXT) USING columnar;

View File

@ -0,0 +1,13 @@
\set upgrade_test_old_citus_version `echo "$upgrade_test_old_citus_version"`
SELECT substring(:'upgrade_test_old_citus_version', 'v(\d+)\.\d+\.\d+')::int >= 10 AND
substring(:'upgrade_test_old_citus_version', 'v\d+\.(\d+)\.\d+')::int >= 0
AS upgrade_test_old_citus_version_ge_10_0;
upgrade_test_old_citus_version_ge_10_0
---------------------------------------------------------------------
f
(1 row)
\gset
\if :upgrade_test_old_citus_version_ge_10_0
\else
\q

View File

@ -56,6 +56,7 @@ ORDER BY 1;
function citus_extradata_container(internal)
function citus_finish_pg_upgrade()
function citus_get_active_worker_nodes()
function citus_internal.downgrade_columnar_storage(regclass)
function citus_internal.find_groupid_for_node(text,integer)
function citus_internal.pg_dist_node_trigger_func()
function citus_internal.pg_dist_rebalance_strategy_enterprise_check()
@ -64,6 +65,7 @@ ORDER BY 1;
function citus_internal.refresh_isolation_tester_prepared_statement()
function citus_internal.replace_isolation_tester_func()
function citus_internal.restore_isolation_tester_func()
function citus_internal.upgrade_columnar_storage(regclass)
function citus_isolation_test_session_is_blocked(integer,integer[])
function citus_json_concatenate(json,json)
function citus_json_concatenate_final(json)
@ -246,5 +248,5 @@ ORDER BY 1;
view citus_worker_stat_activity
view pg_dist_shard_placement
view time_partitions
(230 rows)
(232 rows)

View File

@ -53,6 +53,7 @@ ORDER BY 1;
function citus_extradata_container(internal)
function citus_finish_pg_upgrade()
function citus_get_active_worker_nodes()
function citus_internal.downgrade_columnar_storage(regclass)
function citus_internal.find_groupid_for_node(text,integer)
function citus_internal.pg_dist_node_trigger_func()
function citus_internal.pg_dist_rebalance_strategy_enterprise_check()
@ -61,6 +62,7 @@ ORDER BY 1;
function citus_internal.refresh_isolation_tester_prepared_statement()
function citus_internal.replace_isolation_tester_func()
function citus_internal.restore_isolation_tester_func()
function citus_internal.upgrade_columnar_storage(regclass)
function citus_isolation_test_session_is_blocked(integer,integer[])
function citus_json_concatenate(json,json)
function citus_json_concatenate_final(json)
@ -238,5 +240,5 @@ ORDER BY 1;
view citus_worker_stat_activity
view pg_dist_shard_placement
view time_partitions
(222 rows)
(224 rows)

View File

@ -15,14 +15,20 @@ COPY contestant FROM '@abs_srcdir@/data/contestants.1.csv' WITH CSV;
-- COPY into uncompressed table from program
COPY contestant FROM PROGRAM 'cat @abs_srcdir@/data/contestants.2.csv' WITH CSV;
select
version_major, version_minor, reserved_stripe_id, reserved_row_number, reserved_offset
from columnar_test_helpers.columnar_storage_info('contestant');
-- COPY into compressed table
set columnar.compression = 'pglz';
COPY contestant_compressed FROM '@abs_srcdir@/data/contestants.1.csv' WITH CSV;
-- COPY into uncompressed table from program
COPY contestant_compressed FROM PROGRAM 'cat @abs_srcdir@/data/contestants.2.csv'
WITH CSV;
set columnar.compression to default;
select
version_major, version_minor, reserved_stripe_id, reserved_row_number, reserved_offset
from columnar_test_helpers.columnar_storage_info('contestant_compressed');
-- Test column list
CREATE TABLE famous_constants (id int, name text, value real)

View File

@ -14,13 +14,27 @@ DETAIL: command not found
COPY contestant FROM '@abs_srcdir@/data/contestants.1.csv' WITH CSV;
-- COPY into uncompressed table from program
COPY contestant FROM PROGRAM 'cat @abs_srcdir@/data/contestants.2.csv' WITH CSV;
select
version_major, version_minor, reserved_stripe_id, reserved_row_number, reserved_offset
from columnar_test_helpers.columnar_storage_info('contestant');
version_major | version_minor | reserved_stripe_id | reserved_row_number | reserved_offset
---------------------------------------------------------------------
2 | 0 | 3 | 1 | 24742
(1 row)
-- COPY into compressed table
set columnar.compression = 'pglz';
COPY contestant_compressed FROM '@abs_srcdir@/data/contestants.1.csv' WITH CSV;
-- COPY into uncompressed table from program
COPY contestant_compressed FROM PROGRAM 'cat @abs_srcdir@/data/contestants.2.csv'
WITH CSV;
set columnar.compression to default;
select
version_major, version_minor, reserved_stripe_id, reserved_row_number, reserved_offset
from columnar_test_helpers.columnar_storage_info('contestant_compressed');
version_major | version_minor | reserved_stripe_id | reserved_row_number | reserved_offset
---------------------------------------------------------------------
2 | 0 | 3 | 1 | 24704
(1 row)
-- Test column list
CREATE TABLE famous_constants (id int, name text, value real)
USING columnar;

View File

@ -17,6 +17,10 @@ INSERT INTO test_alter_table SELECT * FROM sample_data;
-- drop a column
ALTER TABLE test_alter_table DROP COLUMN a;
select
version_major, version_minor, reserved_stripe_id, reserved_row_number, reserved_offset
from columnar_test_helpers.columnar_storage_info('test_alter_table');
-- test analyze
ANALYZE test_alter_table;
@ -36,6 +40,10 @@ SELECT * FROM test_alter_table;
INSERT INTO test_alter_table (SELECT 3, 5, 8);
SELECT * FROM test_alter_table;
select
version_major, version_minor, reserved_stripe_id, reserved_row_number, reserved_offset
from columnar_test_helpers.columnar_storage_info('test_alter_table');
-- add a fixed-length column with default value
ALTER TABLE test_alter_table ADD COLUMN e int default 3;
@ -43,6 +51,10 @@ SELECT * from test_alter_table;
INSERT INTO test_alter_table (SELECT 1, 2, 4, 8);
SELECT * from test_alter_table;
select
version_major, version_minor, reserved_stripe_id, reserved_row_number, reserved_offset
from columnar_test_helpers.columnar_storage_info('test_alter_table');
-- add a variable-length column with default value
ALTER TABLE test_alter_table ADD COLUMN f text DEFAULT 'TEXT ME';

View File

@ -7,12 +7,12 @@
CREATE TABLE contestant (handle TEXT, birthdate DATE, rating INT,
percentile FLOAT, country CHAR(3), achievements TEXT[])
USING columnar;
SELECT alter_columnar_table_set('contestant', compression => 'none');
-- should fail
CREATE INDEX contestant_idx on contestant(handle);
-- Create compressed table with automatically determined file path
-- COMPRESSED
-- Create zstd compressed table
CREATE TABLE contestant_compressed (handle TEXT, birthdate DATE, rating INT,
percentile FLOAT, country CHAR(3), achievements TEXT[])
USING columnar;

View File

@ -19,6 +19,14 @@ select count(*) from t_uncompressed;
select * from t_compressed;
select count(*) from t_compressed;
-- check storage
select
version_major, version_minor, reserved_stripe_id, reserved_row_number, reserved_offset
from columnar_test_helpers.columnar_storage_info('t_compressed');
select
version_major, version_minor, reserved_stripe_id, reserved_row_number, reserved_offset
from columnar_test_helpers.columnar_storage_info('t_uncompressed');
-- explain
explain (costs off, summary off, timing off) select * from t_uncompressed;
explain (costs off, summary off, timing off) select * from t_compressed;
@ -31,6 +39,14 @@ vacuum verbose t_uncompressed;
vacuum full t_compressed;
vacuum full t_uncompressed;
-- check storage
select
version_major, version_minor, reserved_stripe_id, reserved_row_number, reserved_offset
from columnar_test_helpers.columnar_storage_info('t_compressed');
select
version_major, version_minor, reserved_stripe_id, reserved_row_number, reserved_offset
from columnar_test_helpers.columnar_storage_info('t_uncompressed');
-- analyze
analyze t_uncompressed;
analyze t_compressed;
@ -43,6 +59,14 @@ truncate t_compressed;
alter table t_uncompressed alter column a type text;
alter table t_compressed alter column a type text;
-- check storage
select
version_major, version_minor, reserved_stripe_id, reserved_row_number, reserved_offset
from columnar_test_helpers.columnar_storage_info('t_compressed');
select
version_major, version_minor, reserved_stripe_id, reserved_row_number, reserved_offset
from columnar_test_helpers.columnar_storage_info('t_uncompressed');
-- verify cost of scanning an empty table is zero, not NaN
explain table t_uncompressed;
explain table t_compressed;

View File

@ -22,6 +22,10 @@ select count(*) from test_insert_command_data;
insert into test_insert_command select * from test_insert_command_data;
select count(*) from test_insert_command;
select
version_major, version_minor, reserved_stripe_id, reserved_row_number, reserved_offset
from columnar_test_helpers.columnar_storage_info('test_insert_command');
SELECT * FROM columnar_test_helpers.chunk_group_consistency;
drop table test_insert_command_data;
@ -99,6 +103,10 @@ SELECT
pg_column_size(external), pg_column_size(extended)
FROM test_toast_columnar;
select
version_major, version_minor, reserved_stripe_id, reserved_row_number, reserved_offset
from columnar_test_helpers.columnar_storage_info('test_toast_columnar');
SELECT * FROM columnar_test_helpers.chunk_group_consistency;
DROP TABLE test_toast_row;
@ -128,6 +136,10 @@ INSERT INTO zero_col_heap SELECT * FROM zero_col_heap;
INSERT INTO zero_col SELECT * FROM zero_col_heap;
select
version_major, version_minor, reserved_stripe_id, reserved_row_number, reserved_offset
from columnar_test_helpers.columnar_storage_info('zero_col');
SELECT relname, stripe_num, chunk_group_count, row_count FROM columnar.stripe a, pg_class b
WHERE columnar_test_helpers.columnar_relation_storageid(b.oid)=a.storage_id AND relname = 'zero_col'
ORDER BY 1,2,3,4;

View File

@ -13,6 +13,10 @@ INSERT INTO t SELECT i, i+1 FROM generate_series(1, 10) i;
ROLLBACK;
SELECT count(*) FROM t;
select
version_major, version_minor, reserved_stripe_id, reserved_row_number, reserved_offset
from columnar_test_helpers.columnar_storage_info('t');
-- check stripe metadata also have been rolled-back
SELECT count(*) FROM t_stripes;
@ -28,6 +32,11 @@ INSERT INTO t SELECT i, i+1 FROM generate_series(1, 10) i;
SELECT count(*) FROM t; -- force flush
SAVEPOINT s1;
INSERT INTO t SELECT i, i+1 FROM generate_series(1, 10) i;
select
version_major, version_minor, reserved_stripe_id, reserved_row_number, reserved_offset
from columnar_test_helpers.columnar_storage_info('t');
SELECT count(*) FROM t;
ROLLBACK TO SAVEPOINT s1;
SELECT count(*) FROM t;
@ -36,6 +45,10 @@ SELECT count(*) FROM t;
INSERT INTO t SELECT i, i+1 FROM generate_series(1, 10) i;
COMMIT;
select
version_major, version_minor, reserved_stripe_id, reserved_row_number, reserved_offset
from columnar_test_helpers.columnar_storage_info('t');
SELECT count(*) FROM t;
SELECT count(*) FROM t_stripes;

View File

@ -5,6 +5,17 @@ CREATE FUNCTION columnar_relation_storageid(relid oid) RETURNS bigint
LANGUAGE C STABLE STRICT
AS 'citus', $$columnar_relation_storageid$$;
CREATE OR REPLACE FUNCTION columnar_storage_info(
rel regclass,
version_major OUT int4,
version_minor OUT int4,
storage_id OUT int8,
reserved_stripe_id OUT int8,
reserved_row_number OUT int8,
reserved_offset OUT int8)
STRICT
LANGUAGE c AS 'citus', $$columnar_storage_info$$;
CREATE FUNCTION compression_type_supported(type text) RETURNS boolean
AS $$
BEGIN

View File

@ -27,8 +27,16 @@ SELECT * FROM columnar_truncate_test;
SELECT * FROM columnar_test_helpers.chunk_group_consistency;
select
version_major, version_minor, reserved_stripe_id, reserved_row_number, reserved_offset
from columnar_test_helpers.columnar_storage_info('columnar_truncate_test');
TRUNCATE TABLE columnar_truncate_test;
select
version_major, version_minor, reserved_stripe_id, reserved_row_number, reserved_offset
from columnar_test_helpers.columnar_storage_info('columnar_truncate_test');
SELECT * FROM columnar_test_helpers.chunk_group_consistency;
SELECT * FROM columnar_truncate_test;

View File

@ -17,6 +17,10 @@ INSERT INTO t SELECT i, i * i FROM generate_series(21, 30) i;
SELECT sum(a), sum(b) FROM t;
SELECT count(*) FROM t_stripes;
select
version_major, version_minor, reserved_stripe_id, reserved_row_number, reserved_offset
from columnar_test_helpers.columnar_storage_info('t');
-- vacuum full should merge stripes together
VACUUM FULL t;
@ -25,6 +29,10 @@ SELECT * FROM columnar_test_helpers.chunk_group_consistency;
SELECT sum(a), sum(b) FROM t;
SELECT count(*) FROM t_stripes;
select
version_major, version_minor, reserved_stripe_id, reserved_row_number, reserved_offset
from columnar_test_helpers.columnar_storage_info('t');
-- test the case when all data cannot fit into a single stripe
SELECT alter_columnar_table_set('t', stripe_row_limit => 1000);
INSERT INTO t SELECT i, 2 * i FROM generate_series(1,2500) i;
@ -34,6 +42,10 @@ SELECT count(*) FROM t_stripes;
VACUUM FULL t;
select
version_major, version_minor, reserved_stripe_id, reserved_row_number, reserved_offset
from columnar_test_helpers.columnar_storage_info('t');
SELECT * FROM columnar_test_helpers.chunk_group_consistency;
SELECT sum(a), sum(b) FROM t;
@ -95,6 +107,9 @@ INSERT INTO t SELECT i / 5 FROM generate_series(1, 1500) i;
COMMIT;
VACUUM VERBOSE t;
select
version_major, version_minor, reserved_stripe_id, reserved_row_number, reserved_offset
from columnar_test_helpers.columnar_storage_info('t');
SELECT * FROM columnar_test_helpers.chunk_group_consistency;

View File

@ -0,0 +1,45 @@
\set upgrade_test_old_citus_version `echo "$upgrade_test_old_citus_version"`
SELECT substring(:'upgrade_test_old_citus_version', 'v(\d+)\.\d+\.\d+')::int >= 10 AND
substring(:'upgrade_test_old_citus_version', 'v\d+\.(\d+)\.\d+')::int >= 0
AS upgrade_test_old_citus_version_ge_10_0;
\gset
\if :upgrade_test_old_citus_version_ge_10_0
\else
\q
\endif
-- it's not the best practice to define this here, but we don't want to include
-- columnar_test_helpers in upgrade test schedule
CREATE OR REPLACE FUNCTION columnar_storage_info(
rel regclass,
version_major OUT int4,
version_minor OUT int4,
storage_id OUT int8,
reserved_stripe_id OUT int8,
reserved_row_number OUT int8,
reserved_offset OUT int8)
STRICT
LANGUAGE c AS 'citus', 'columnar_storage_info';
SET search_path TO upgrade_columnar_metapage, public;
-- should work since we upgrade metapages when upgrading schema version
INSERT INTO columnar_table_1 VALUES (3);
-- show that all columnar relation's metapage's are upgraded to "2.0"
SELECT count(*)=0
FROM (SELECT (columnar_storage_info(c.oid)).* t
FROM pg_class c, pg_am a
WHERE c.relam = a.oid AND amname = 'columnar') t
WHERE t.version_major != 2 and t.version_minor != 0;
-- print metapage for two of the tables
SELECT columnar_storage_info('columnar_table_1');
SELECT columnar_storage_info('columnar_table_2');
-- table is already upgraded, make sure that upgrade_columnar_metapage is no-op
SELECT citus_internal.upgrade_columnar_storage(c.oid)
FROM pg_class c, pg_am a
WHERE c.relam = a.oid AND amname = 'columnar' and relname = 'columnar_table_2';
SELECT columnar_storage_info('columnar_table_2');

View File

@ -0,0 +1,23 @@
\set upgrade_test_old_citus_version `echo "$upgrade_test_old_citus_version"`
SELECT substring(:'upgrade_test_old_citus_version', 'v(\d+)\.\d+\.\d+')::int >= 10 AND
substring(:'upgrade_test_old_citus_version', 'v\d+\.(\d+)\.\d+')::int >= 0
AS upgrade_test_old_citus_version_ge_10_0;
\gset
\if :upgrade_test_old_citus_version_ge_10_0
\else
\q
\endif
CREATE SCHEMA upgrade_columnar_metapage;
SET search_path TO upgrade_columnar_metapage, public;
CREATE TABLE columnar_table_1(a INT, b INT) USING columnar;
INSERT INTO columnar_table_1 SELECT i FROM generate_series(160001, 320000) i;
CREATE TABLE columnar_table_2(b INT) USING columnar;
INSERT INTO columnar_table_2 VALUES (160);
CREATE TABLE columnar_table_3(b INT) USING columnar;
INSERT INTO columnar_table_3 VALUES (1), (2);
CREATE TABLE no_data_columnar_table(a INT, b INT, c TEXT) USING columnar;