diff --git a/src/backend/columnar/columnar_debug.c b/src/backend/columnar/columnar_debug.c index 2a772e7da..5b62a3b56 100644 --- a/src/backend/columnar/columnar_debug.c +++ b/src/backend/columnar/columnar_debug.c @@ -12,6 +12,7 @@ #include "pg_config.h" #include "access/nbtree.h" +#include "access/table.h" #include "catalog/pg_am.h" #include "catalog/pg_type.h" #include "distributed/pg_version_constants.h" @@ -25,11 +26,13 @@ #include "utils/tuplestore.h" #include "columnar/columnar.h" +#include "columnar/columnar_storage.h" #include "columnar/columnar_version_compat.h" static void MemoryContextTotals(MemoryContext context, MemoryContextCounters *counters); PG_FUNCTION_INFO_V1(columnar_store_memory_stats); +PG_FUNCTION_INFO_V1(columnar_storage_info); /* @@ -72,6 +75,74 @@ columnar_store_memory_stats(PG_FUNCTION_ARGS) } +/* + * columnar_storage_info - UDF to return internal storage info for a columnar relation. + * + * DDL: + * CREATE OR REPLACE FUNCTION columnar_storage_info( + * rel regclass, + * version_major OUT int4, + * version_minor OUT int4, + * storage_id OUT int8, + * reserved_stripe_id OUT int8, + * reserved_row_number OUT int8, + * reserved_offset OUT int8) + * STRICT + * LANGUAGE c AS 'MODULE_PATHNAME', 'columnar_storage_info'; + */ +Datum +columnar_storage_info(PG_FUNCTION_ARGS) +{ +#define STORAGE_INFO_NATTS 6 + Oid relid = PG_GETARG_OID(0); + TupleDesc tupdesc; + + /* Build a tuple descriptor for our result type */ + if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE) + { + elog(ERROR, "return type must be a row type"); + } + + if (tupdesc->natts != STORAGE_INFO_NATTS) + { + elog(ERROR, "return type must have %d columns", STORAGE_INFO_NATTS); + } + + Relation rel = table_open(relid, AccessShareLock); + if (!IsColumnarTableAmTable(relid)) + { + ereport(ERROR, (errmsg("table \"%s\" is not a columnar table", + RelationGetRelationName(rel)))); + } + + RelationOpenSmgr(rel); + + Datum values[STORAGE_INFO_NATTS] = { 0 }; + bool nulls[STORAGE_INFO_NATTS] = { 0 }; + + /* + * Pass force = true so that we can inspect metapages that are not the + * current version. + * + * NB: ensure the order and number of attributes correspond to DDL + * declaration. + */ + values[0] = Int32GetDatum(ColumnarStorageGetVersionMajor(rel, true)); + values[1] = Int32GetDatum(ColumnarStorageGetVersionMinor(rel, true)); + values[2] = Int64GetDatum(ColumnarStorageGetStorageId(rel, true)); + values[3] = Int64GetDatum(ColumnarStorageGetReservedStripeId(rel, true)); + values[4] = Int64GetDatum(ColumnarStorageGetReservedRowNumber(rel, true)); + values[5] = Int64GetDatum(ColumnarStorageGetReservedOffset(rel, true)); + + /* release lock */ + table_close(rel, AccessShareLock); + + HeapTuple tuple = heap_form_tuple(tupdesc, values, nulls); + + PG_RETURN_DATUM(HeapTupleGetDatum(tuple)); +} + + /* * MemoryContextTotals adds stats of the given memory context and its * subtree to the given counters. diff --git a/src/backend/columnar/columnar_metadata.c b/src/backend/columnar/columnar_metadata.c index b6aab8922..f793d76db 100644 --- a/src/backend/columnar/columnar_metadata.c +++ b/src/backend/columnar/columnar_metadata.c @@ -1,8 +1,19 @@ /*------------------------------------------------------------------------- * - * columnar_metadata_tables.c + * columnar_metadata.c * - * Copyright (c), Citus Data, Inc. + * Copyright (c) Citus Data, Inc. + * + * Manages metadata for columnar relations in separate, shared metadata tables + * in the "columnar" schema. + * + * * holds basic stripe information including data size and row counts + * * holds basic chunk and chunk group information like data offsets and + * min/max values (used for Chunk Group Filtering) + * * useful for fast VACUUM operations (e.g. reporting with VACUUM VERBOSE) + * * useful for stats/costing + * * TODO: maps logical row numbers to stripe IDs + * * TODO: visibility information * *------------------------------------------------------------------------- */ @@ -14,6 +25,7 @@ #include "citus_version.h" #include "columnar/columnar.h" +#include "columnar/columnar_storage.h" #include "columnar/columnar_version_compat.h" #include @@ -30,7 +42,6 @@ #include "commands/sequence.h" #include "commands/trigger.h" #include "distributed/metadata_cache.h" -#include "distributed/resource_lock.h" #include "executor/executor.h" #include "executor/spi.h" #include "miscadmin.h" @@ -48,28 +59,6 @@ #include "utils/relfilenodemap.h" -/* - * Content of the first page in main fork, which stores metadata at file - * level. - */ -typedef struct ColumnarMetapage -{ - /* - * Store version of file format used, so we can detect files from - * previous versions if we change file format. - */ - int versionMajor; - int versionMinor; - - /* - * Each of the metadata table rows are identified by a storageId. - * We store it also in the main fork so we can link metadata rows - * with data files. - */ - uint64 storageId; -} ColumnarMetapage; - - typedef struct { Relation rel; @@ -80,8 +69,6 @@ static void InsertStripeMetadataRow(uint64 storageId, StripeMetadata *stripe); static void GetHighestUsedAddressAndId(uint64 storageId, uint64 *highestUsedAddress, uint64 *highestUsedId); -static void LockForStripeReservation(Relation rel, LOCKMODE mode); -static void UnlockForStripeReservation(Relation rel, LOCKMODE mode); static List * ReadDataFileStripeList(uint64 storageId, Snapshot snapshot); static uint32 * ReadChunkGroupRowCounts(uint64 storageId, uint64 stripe, uint32 chunkGroupCount); @@ -95,6 +82,7 @@ static Oid ColumnarChunkGroupRelationId(void); static Oid ColumnarChunkIndexRelationId(void); static Oid ColumnarChunkGroupIndexRelationId(void); static Oid ColumnarNamespaceId(void); +static uint64 LookupStorageId(RelFileNode relfilenode); static void DeleteStorageFromColumnarMetadataTable(Oid metadataTableId, AttrNumber storageIdAtrrNumber, Oid storageIdIndexId, @@ -107,8 +95,6 @@ static void FinishModifyRelation(ModifyState *state); static EState * create_estate_for_relation(Relation rel); static bytea * DatumToBytea(Datum value, Form_pg_attribute attrForm); static Datum ByteaToDatum(bytea *bytes, Form_pg_attribute attrForm); -static ColumnarMetapage * InitMetapage(Relation relation); -static ColumnarMetapage * ReadMetapage(RelFileNode relfilenode, bool missingOk); static bool WriteColumnarOptions(Oid regclass, ColumnarOptions *options, bool overwrite); PG_FUNCTION_INFO_V1(columnar_relation_storageid); @@ -423,7 +409,7 @@ SaveStripeSkipList(RelFileNode relfilenode, uint64 stripe, StripeSkipList *chunk uint32 chunkIndex = 0; uint32 columnCount = chunkList->columnCount; - ColumnarMetapage *metapage = ReadMetapage(relfilenode, false); + uint64 storageId = LookupStorageId(relfilenode); Oid columnarChunkOid = ColumnarChunkRelationId(); Relation columnarChunk = table_open(columnarChunkOid, RowExclusiveLock); ModifyState *modifyState = StartModifyRelation(columnarChunk); @@ -436,7 +422,7 @@ SaveStripeSkipList(RelFileNode relfilenode, uint64 stripe, StripeSkipList *chunk &chunkList->chunkSkipNodeArray[columnIndex][chunkIndex]; Datum values[Natts_columnar_chunk] = { - UInt64GetDatum(metapage->storageId), + UInt64GetDatum(storageId), Int64GetDatum(stripe), Int32GetDatum(columnIndex + 1), Int32GetDatum(chunkIndex), @@ -487,7 +473,7 @@ void SaveChunkGroups(RelFileNode relfilenode, uint64 stripe, List *chunkGroupRowCounts) { - ColumnarMetapage *metapage = ReadMetapage(relfilenode, false); + uint64 storageId = LookupStorageId(relfilenode); Oid columnarChunkGroupOid = ColumnarChunkGroupRelationId(); Relation columnarChunkGroup = table_open(columnarChunkGroupOid, RowExclusiveLock); ModifyState *modifyState = StartModifyRelation(columnarChunkGroup); @@ -499,7 +485,7 @@ SaveChunkGroups(RelFileNode relfilenode, uint64 stripe, { int64 rowCount = lfirst_int(lc); Datum values[Natts_columnar_chunkgroup] = { - UInt64GetDatum(metapage->storageId), + UInt64GetDatum(storageId), Int64GetDatum(stripe), Int32GetDatum(chunkId), Int64GetDatum(rowCount) @@ -530,14 +516,14 @@ ReadStripeSkipList(RelFileNode relfilenode, uint64 stripe, TupleDesc tupleDescri uint32 columnCount = tupleDescriptor->natts; ScanKeyData scanKey[2]; - ColumnarMetapage *metapage = ReadMetapage(relfilenode, false); + uint64 storageId = LookupStorageId(relfilenode); Oid columnarChunkOid = ColumnarChunkRelationId(); Relation columnarChunk = table_open(columnarChunkOid, AccessShareLock); Relation index = index_open(ColumnarChunkIndexRelationId(), AccessShareLock); ScanKeyInit(&scanKey[0], Anum_columnar_chunk_storageid, - BTEqualStrategyNumber, F_OIDEQ, UInt64GetDatum(metapage->storageId)); + BTEqualStrategyNumber, F_OIDEQ, UInt64GetDatum(storageId)); ScanKeyInit(&scanKey[1], Anum_columnar_chunk_stripe, BTEqualStrategyNumber, F_OIDEQ, Int32GetDatum(stripe)); @@ -624,7 +610,7 @@ ReadStripeSkipList(RelFileNode relfilenode, uint64 stripe, TupleDesc tupleDescri table_close(columnarChunk, AccessShareLock); chunkList->chunkGroupRowCounts = - ReadChunkGroupRowCounts(metapage->storageId, stripe, chunkCount); + ReadChunkGroupRowCounts(storageId, stripe, chunkCount); return chunkList; } @@ -729,15 +715,9 @@ InsertStripeMetadataRow(uint64 storageId, StripeMetadata *stripe) List * StripesForRelfilenode(RelFileNode relfilenode) { - ColumnarMetapage *metapage = ReadMetapage(relfilenode, true); - if (metapage == NULL) - { - /* empty relation */ - return NIL; - } + uint64 storageId = LookupStorageId(relfilenode); - - return ReadDataFileStripeList(metapage->storageId, GetTransactionSnapshot()); + return ReadDataFileStripeList(storageId, GetTransactionSnapshot()); } @@ -752,17 +732,11 @@ StripesForRelfilenode(RelFileNode relfilenode) uint64 GetHighestUsedAddress(RelFileNode relfilenode) { + uint64 storageId = LookupStorageId(relfilenode); + uint64 highestUsedAddress = 0; uint64 highestUsedId = 0; - ColumnarMetapage *metapage = ReadMetapage(relfilenode, true); - - /* empty data file? */ - if (metapage == NULL) - { - return 0; - } - - GetHighestUsedAddressAndId(metapage->storageId, &highestUsedAddress, &highestUsedId); + GetHighestUsedAddressAndId(storageId, &highestUsedAddress, &highestUsedId); return highestUsedAddress; } @@ -799,35 +773,6 @@ GetHighestUsedAddressAndId(uint64 storageId, } -/* - * LockForStripeReservation acquires a lock for stripe reservation. - */ -static void -LockForStripeReservation(Relation rel, LOCKMODE mode) -{ - /* - * We use an advisory lock here so we can easily detect these kind of - * locks in IsProcessWaitingForSafeOperations() and don't include them - * in the lock graph. - */ - LOCKTAG tag; - SET_LOCKTAG_COLUMNAR_STRIPE_RESERVATION(tag, rel); - LockAcquire(&tag, mode, false, false); -} - - -/* - * UnlockForStripeReservation releases the stripe reservation lock. - */ -static void -UnlockForStripeReservation(Relation rel, LOCKMODE mode) -{ - LOCKTAG tag; - SET_LOCKTAG_COLUMNAR_STRIPE_RESERVATION(tag, rel); - LockRelease(&tag, mode, false); -} - - /* * ReserveStripe reserves and stripe of given size for the given relation, * and inserts it into columnar.stripe. It is guaranteed that concurrent @@ -839,47 +784,16 @@ ReserveStripe(Relation rel, uint64 sizeBytes, uint64 chunkCount, uint64 chunkGroupRowCount) { StripeMetadata stripe = { 0 }; - uint64 currLogicalHigh = 0; - uint64 highestId = 0; + + uint64 storageId = ColumnarStorageGetStorageId(rel, false); /* - * We take ExclusiveLock here, so two space reservations conflict. + * TODO: For now, we don't use row number reservation at all, so just use + * dummy values. */ - LOCKMODE lockMode = ExclusiveLock; - LockForStripeReservation(rel, lockMode); - - RelFileNode relfilenode = rel->rd_node; - - /* - * If this is the first stripe for this relation, initialize the - * metapage, otherwise use the previously initialized metapage. - */ - ColumnarMetapage *metapage = ReadMetapage(relfilenode, true); - if (metapage == NULL) - { - metapage = InitMetapage(rel); - } - - GetHighestUsedAddressAndId(metapage->storageId, &currLogicalHigh, &highestId); - SmgrAddr currSmgrHigh = logical_to_smgr(currLogicalHigh); - - SmgrAddr resSmgrStart = next_block_start(currSmgrHigh); - uint64 resLogicalStart = smgr_to_logical(resSmgrStart); - - uint64 resLogicalEnd = resLogicalStart + sizeBytes - 1; - SmgrAddr resSmgrEnd = logical_to_smgr(resLogicalEnd); - - RelationOpenSmgr(rel); - uint64 nblocks = smgrnblocks(rel->rd_smgr, MAIN_FORKNUM); - - while (resSmgrEnd.blockno >= nblocks) - { - Buffer newBuffer = ReadBuffer(rel, P_NEW); - ReleaseBuffer(newBuffer); - nblocks = smgrnblocks(rel->rd_smgr, MAIN_FORKNUM); - } - - RelationCloseSmgr(rel); + uint64 firstReservedRow; + uint64 stripeId = ColumnarStorageReserveStripe(rel, 0, &firstReservedRow); + uint64 resLogicalStart = ColumnarStorageReserveData(rel, sizeBytes); stripe.fileOffset = resLogicalStart; stripe.dataLength = sizeBytes; @@ -887,11 +801,9 @@ ReserveStripe(Relation rel, uint64 sizeBytes, stripe.chunkGroupRowCount = chunkGroupRowCount; stripe.columnCount = columnCount; stripe.rowCount = rowCount; - stripe.id = highestId + 1; + stripe.id = stripeId; - InsertStripeMetadataRow(metapage->storageId, &stripe); - - UnlockForStripeReservation(rel, lockMode); + InsertStripeMetadataRow(storageId, &stripe); return stripe; } @@ -970,28 +882,20 @@ DeleteMetadataRows(RelFileNode relfilenode) return; } - ColumnarMetapage *metapage = ReadMetapage(relfilenode, true); - if (metapage == NULL) - { - /* - * No data has been written to this storage yet, so there is no - * associated metadata yet. - */ - return; - } + uint64 storageId = LookupStorageId(relfilenode); DeleteStorageFromColumnarMetadataTable(ColumnarStripeRelationId(), Anum_columnar_stripe_storageid, ColumnarStripeIndexRelationId(), - metapage->storageId); + storageId); DeleteStorageFromColumnarMetadataTable(ColumnarChunkGroupRelationId(), Anum_columnar_chunkgroup_storageid, ColumnarChunkGroupIndexRelationId(), - metapage->storageId); + storageId); DeleteStorageFromColumnarMetadataTable(ColumnarChunkRelationId(), Anum_columnar_chunk_storageid, ColumnarChunkIndexRelationId(), - metapage->storageId); + storageId); } @@ -1312,75 +1216,31 @@ ColumnarNamespaceId(void) /* - * ReadMetapage reads metapage for the given relfilenode. It returns + * LookupStorageId reads storage metapage to find the storage ID for the given relfilenode. It returns * false if the relation doesn't have a meta page yet. */ -static ColumnarMetapage * -ReadMetapage(RelFileNode relfilenode, bool missingOk) +static uint64 +LookupStorageId(RelFileNode relfilenode) { - StringInfo metapageBuffer = NULL; Oid relationId = RelidByRelfilenode(relfilenode.spcNode, relfilenode.relNode); - if (OidIsValid(relationId)) - { - Relation relation = relation_open(relationId, NoLock); - RelationOpenSmgr(relation); - int nblocks = smgrnblocks(relation->rd_smgr, MAIN_FORKNUM); - RelationCloseSmgr(relation); + Relation relation = relation_open(relationId, AccessShareLock); + uint64 storageId = ColumnarStorageGetStorageId(relation, false); + table_close(relation, AccessShareLock); - if (nblocks != 0) - { - metapageBuffer = ReadFromSmgr(relation, 0, sizeof(ColumnarMetapage)); - } - - relation_close(relation, NoLock); - } - - if (metapageBuffer == NULL) - { - if (!missingOk) - { - elog(ERROR, "columnar metapage was not found"); - } - - return NULL; - } - - ColumnarMetapage *metapage = palloc0(sizeof(ColumnarMetapage)); - memcpy_s((void *) metapage, sizeof(ColumnarMetapage), - metapageBuffer->data, sizeof(ColumnarMetapage)); - - return metapage; + return storageId; } /* - * InitMetapage initializes metapage for the given relation. + * ColumnarMetadataNewStorageId - create a new, unique storage id and return + * it. */ -static ColumnarMetapage * -InitMetapage(Relation relation) +uint64 +ColumnarMetadataNewStorageId() { - /* - * If we init metapage during upgrade, we might override the - * pre-upgrade storage id which will render pre-upgrade data - * invisible. - */ - Assert(!IsBinaryUpgrade); - ColumnarMetapage *metapage = palloc0(sizeof(ColumnarMetapage)); - - metapage->storageId = nextval_internal(ColumnarStorageIdSequenceRelationId(), false); - metapage->versionMajor = COLUMNAR_VERSION_MAJOR; - metapage->versionMinor = COLUMNAR_VERSION_MINOR; - - /* create the first block */ - Buffer newBuffer = ReadBuffer(relation, P_NEW); - ReleaseBuffer(newBuffer); - - Assert(sizeof(ColumnarMetapage) <= BLCKSZ - SizeOfPageHeaderData); - WriteToSmgr(relation, 0, (char *) metapage, sizeof(ColumnarMetapage)); - - return metapage; + return nextval_internal(ColumnarStorageIdSequenceRelationId(), false); } @@ -1391,20 +1251,53 @@ InitMetapage(Relation relation) Datum columnar_relation_storageid(PG_FUNCTION_ARGS) { - uint64 storageId = -1; - Oid relationId = PG_GETARG_OID(0); Relation relation = relation_open(relationId, AccessShareLock); - if (IsColumnarTableAmTable(relationId)) + if (!IsColumnarTableAmTable(relationId)) { - ColumnarMetapage *metadata = ReadMetapage(relation->rd_node, true); - if (metadata != NULL) - { - storageId = metadata->storageId; - } + elog(ERROR, "relation \"%s\" is not a columnar table", + RelationGetRelationName(relation)); } + uint64 storageId = ColumnarStorageGetStorageId(relation, false); + relation_close(relation, AccessShareLock); PG_RETURN_INT64(storageId); } + + +/* + * ColumnarStorageUpdateIfNeeded - upgrade columnar storage to the current version by + * using information from the metadata tables. + */ +void +ColumnarStorageUpdateIfNeeded(Relation rel, bool isUpgrade) +{ + if (ColumnarStorageIsCurrent(rel)) + { + return; + } + + RelationOpenSmgr(rel); + BlockNumber nblocks = smgrnblocks(rel->rd_smgr, MAIN_FORKNUM); + if (nblocks < 2) + { + ColumnarStorageInit(rel->rd_smgr, ColumnarMetadataNewStorageId()); + return; + } + + uint64 storageId = ColumnarStorageGetStorageId(rel, true); + + uint64 highestId; + uint64 highestOffset; + GetHighestUsedAddressAndId(storageId, &highestOffset, &highestId); + + uint64 reservedStripeId = highestId + 1; + + /* XXX: should be set properly */ + uint64 reservedRowNumber = 0; + uint64 reservedOffset = highestOffset + 1; + ColumnarStorageUpdateCurrent(rel, isUpgrade, reservedStripeId, + reservedRowNumber, reservedOffset); +} diff --git a/src/backend/columnar/columnar_reader.c b/src/backend/columnar/columnar_reader.c index a4939b763..e4922bb13 100644 --- a/src/backend/columnar/columnar_reader.c +++ b/src/backend/columnar/columnar_reader.c @@ -34,6 +34,7 @@ #include "utils/rel.h" #include "columnar/columnar.h" +#include "columnar/columnar_storage.h" #include "columnar/columnar_version_compat.h" typedef struct ChunkGroupReadState @@ -667,8 +668,12 @@ LoadColumnBuffers(Relation relation, ColumnChunkSkipNode *chunkSkipNodeArray, { ColumnChunkSkipNode *chunkSkipNode = &chunkSkipNodeArray[chunkIndex]; uint64 existsOffset = stripeOffset + chunkSkipNode->existsChunkOffset; - StringInfo rawExistsBuffer = ReadFromSmgr(relation, existsOffset, - chunkSkipNode->existsLength); + StringInfo rawExistsBuffer = makeStringInfo(); + + enlargeStringInfo(rawExistsBuffer, chunkSkipNode->existsLength); + rawExistsBuffer->len = chunkSkipNode->existsLength; + ColumnarStorageRead(relation, existsOffset, rawExistsBuffer->data, + chunkSkipNode->existsLength); chunkBuffersArray[chunkIndex]->existsBuffer = rawExistsBuffer; } @@ -679,8 +684,12 @@ LoadColumnBuffers(Relation relation, ColumnChunkSkipNode *chunkSkipNodeArray, ColumnChunkSkipNode *chunkSkipNode = &chunkSkipNodeArray[chunkIndex]; CompressionType compressionType = chunkSkipNode->valueCompressionType; uint64 valueOffset = stripeOffset + chunkSkipNode->valueChunkOffset; - StringInfo rawValueBuffer = ReadFromSmgr(relation, valueOffset, - chunkSkipNode->valueLength); + StringInfo rawValueBuffer = makeStringInfo(); + + enlargeStringInfo(rawValueBuffer, chunkSkipNode->valueLength); + rawValueBuffer->len = chunkSkipNode->valueLength; + ColumnarStorageRead(relation, valueOffset, rawValueBuffer->data, + chunkSkipNode->valueLength); chunkBuffersArray[chunkIndex]->valueBuffer = rawValueBuffer; chunkBuffersArray[chunkIndex]->valueCompressionType = compressionType; @@ -1269,30 +1278,3 @@ ColumnDefaultValue(TupleConstr *tupleConstraints, Form_pg_attribute attributeFor "does not evaluate to constant value"))); } } - - -StringInfo -ReadFromSmgr(Relation rel, uint64 offset, uint32 size) -{ - StringInfo resultBuffer = makeStringInfo(); - uint64 read = 0; - - enlargeStringInfo(resultBuffer, size); - resultBuffer->len = size; - - while (read < size) - { - SmgrAddr addr = logical_to_smgr(offset + read); - - Buffer buffer = ReadBuffer(rel, addr.blockno); - Page page = BufferGetPage(buffer); - PageHeader phdr = (PageHeader) page; - - uint32 to_read = Min(size - read, phdr->pd_upper - addr.offset); - memcpy_s(resultBuffer->data + read, size - read, page + addr.offset, to_read); - ReleaseBuffer(buffer); - read += to_read; - } - - return resultBuffer; -} diff --git a/src/backend/columnar/columnar_storage.c b/src/backend/columnar/columnar_storage.c new file mode 100644 index 000000000..e0a485e61 --- /dev/null +++ b/src/backend/columnar/columnar_storage.c @@ -0,0 +1,762 @@ +/*------------------------------------------------------------------------- + * + * columnar_storage.c + * + * Copyright (c) Citus Data, Inc. + * + * Low-level storage layer for columnar. + * - Translates columnar read/write operations on logical offsets into operations on pages/blocks. + * - Emits WAL. + * - Reads/writes the columnar metapage. + * - Reserves data offsets, stripe numbers, and row offsets. + * - Truncation. + * + * Higher-level columnar operations deal with logical offsets and large + * contiguous buffers of data that need to be stored. But the buffer manager + * and WAL depend on formatted pages with headers, so these large buffers need + * to be written across many pages. This module translates the contiguous + * buffers into individual block reads/writes, and performs WAL when + * necessary. + * + * Storage layout: a metapage in block 0, followed by an empty page in block + * 1, followed by logical data starting at the first byte after the page + * header in block 2 (having logical offset ColumnarFirstLogicalOffset). (XXX: + * Block 1 is left empty for no particular reason. Reconsider?). A columnar + * table should always have at least 2 blocks. + * + * Reservation is done with a relation extension lock, and designed for + * concurrency, so the callers only need an ordinary lock on the + * relation. Initializing the metapage or truncating the relation require that + * the caller holds an AccessExclusiveLock. (XXX: New reservations of data are + * aligned onto a new page for no particular reason. Reconsider?). + * + *------------------------------------------------------------------------- + */ + + +#include "postgres.h" + +#include "safe_lib.h" + +#include "catalog/storage.h" +#include "miscadmin.h" +#include "storage/bufmgr.h" +#include "storage/lmgr.h" + +#include "columnar/columnar.h" +#include "columnar/columnar_storage.h" + + +/* + * Content of the first page in main fork, which stores metadata at file + * level. + */ +typedef struct ColumnarMetapage +{ + /* + * Store version of file format used, so we can detect files from + * previous versions if we change file format. + */ + uint32 versionMajor; + uint32 versionMinor; + + /* + * Each of the metadata table rows are identified by a storageId. + * We store it also in the main fork so we can link metadata rows + * with data files. + */ + uint64 storageId; + + uint64 reservedStripeId; /* first unused stripe id */ + uint64 reservedRowNumber; /* first unused row number */ + uint64 reservedOffset; /* first unused byte offset */ +} ColumnarMetapage; + + +/* represents a "physical" block+offset address */ +typedef struct PhysicalAddr +{ + BlockNumber blockno; + uint32 offset; +} PhysicalAddr; + + +#define COLUMNAR_METAPAGE_BLOCKNO 0 +#define COLUMNAR_EMPTY_BLOCKNO 1 +#define COLUMNAR_INVALID_STRIPE_ID 0 +#define COLUMNAR_FIRST_STRIPE_ID 1 +#define COLUMNAR_INVALID_ROW_NUMBER 0 +#define COLUMNAR_FIRST_ROW_NUMBER 1 + + +/* + * Map logical offsets to a physical page and offset where the data is kept. + */ +static inline PhysicalAddr +LogicalToPhysical(uint64 logicalOffset) +{ + PhysicalAddr addr; + + addr.blockno = logicalOffset / COLUMNAR_BYTES_PER_PAGE; + addr.offset = SizeOfPageHeaderData + (logicalOffset % COLUMNAR_BYTES_PER_PAGE); + + return addr; +} + + +/* + * Map a physical page and offset address to a logical address. + */ +static inline uint64 +PhysicalToLogical(PhysicalAddr addr) +{ + return COLUMNAR_BYTES_PER_PAGE * addr.blockno + addr.offset - SizeOfPageHeaderData; +} + + +static ColumnarMetapage ColumnarMetapageRead(Relation rel, bool force); +static void ReadFromBlock(Relation rel, BlockNumber blockno, uint32 offset, + char *buf, uint32 len, bool force); +static void WriteToBlock(Relation rel, BlockNumber blockno, uint32 offset, + char *buf, uint32 len, bool clear); +static uint64 AlignReservation(uint64 prevReservation); +static bool ColumnarMetapageIsCurrent(ColumnarMetapage *metapage); +static bool ColumnarMetapageIsOlder(ColumnarMetapage *metapage); +static bool ColumnarMetapageIsNewer(ColumnarMetapage *metapage); +static void ColumnarMetapageCheckVersion(Relation rel, ColumnarMetapage *metapage); + + +/* + * ColumnarStorageInit - initialize a new metapage in an empty relation + * with the given storageId. + * + * Caller must hold AccessExclusiveLock on the relation. + */ +void +ColumnarStorageInit(SMgrRelation srel, uint64 storageId) +{ + BlockNumber nblocks = smgrnblocks(srel, MAIN_FORKNUM); + + if (nblocks > 0) + { + elog(ERROR, + "attempted to initialize metapage, but %d pages already exist", + nblocks); + } + + /* create two pages */ + PGAlignedBlock block; + Page page = block.data; + + /* write metapage */ + PageInit(page, BLCKSZ, 0); + PageHeader phdr = (PageHeader) page; + + ColumnarMetapage metapage = { 0 }; + metapage.storageId = storageId; + metapage.versionMajor = COLUMNAR_VERSION_MAJOR; + metapage.versionMinor = COLUMNAR_VERSION_MINOR; + metapage.reservedStripeId = COLUMNAR_FIRST_STRIPE_ID; + metapage.reservedRowNumber = COLUMNAR_FIRST_ROW_NUMBER; + metapage.reservedOffset = ColumnarFirstLogicalOffset; + memcpy_s(page + phdr->pd_lower, phdr->pd_upper - phdr->pd_lower, + (char *) &metapage, sizeof(ColumnarMetapage)); + phdr->pd_lower += sizeof(ColumnarMetapage); + + PageSetChecksumInplace(page, COLUMNAR_METAPAGE_BLOCKNO); + smgrwrite(srel, MAIN_FORKNUM, COLUMNAR_METAPAGE_BLOCKNO, page, true); + log_newpage(&srel->smgr_rnode.node, MAIN_FORKNUM, + COLUMNAR_METAPAGE_BLOCKNO, page, true); + + /* write empty page */ + PageInit(page, BLCKSZ, 0); + + PageSetChecksumInplace(page, COLUMNAR_EMPTY_BLOCKNO); + smgrwrite(srel, MAIN_FORKNUM, COLUMNAR_EMPTY_BLOCKNO, page, true); + log_newpage(&srel->smgr_rnode.node, MAIN_FORKNUM, + COLUMNAR_EMPTY_BLOCKNO, page, true); + + /* + * An immediate sync is required even if we xlog'd the page, because the + * write did not go through shared_buffers and therefore a concurrent + * checkpoint may have moved the redo pointer past our xlog record. + */ + smgrimmedsync(srel, MAIN_FORKNUM); +} + + +/* + * ColumnarStorageUpdateCurrent - update the metapage to the current + * version. No effect if the version already matches. If 'upgrade' is true, + * throw an error if metapage version is newer; if 'upgrade' is false, it's a + * downgrade, so throw an error if the metapage version is older. + * + * NB: caller must ensure that metapage already exists, which might not be the + * case on 10.0. + */ +void +ColumnarStorageUpdateCurrent(Relation rel, bool upgrade, uint64 reservedStripeId, + uint64 reservedRowNumber, uint64 reservedOffset) +{ + LockRelationForExtension(rel, ExclusiveLock); + + ColumnarMetapage metapage = ColumnarMetapageRead(rel, true); + + if (ColumnarMetapageIsCurrent(&metapage)) + { + /* nothing to do */ + return; + } + + if (upgrade && ColumnarMetapageIsNewer(&metapage)) + { + elog(ERROR, "found newer columnar metapage while upgrading"); + } + + if (!upgrade && ColumnarMetapageIsOlder(&metapage)) + { + elog(ERROR, "found older columnar metapage while downgrading"); + } + + metapage.versionMajor = COLUMNAR_VERSION_MAJOR; + metapage.versionMinor = COLUMNAR_VERSION_MINOR; + + /* storageId remains the same */ + metapage.reservedStripeId = reservedStripeId; + metapage.reservedRowNumber = reservedRowNumber; + metapage.reservedOffset = reservedOffset; + + WriteToBlock(rel, COLUMNAR_METAPAGE_BLOCKNO, SizeOfPageHeaderData, + (char *) &metapage, sizeof(ColumnarMetapage), + true /* clear because we are overwriting */); + + UnlockRelationForExtension(rel, ExclusiveLock); +} + + +/* + * ColumnarStorageGetVersionMajor - return major version from the metapage. + * + * Throw an error if the metapage is not the current version, unless + * 'force' is true. + */ +uint64 +ColumnarStorageGetVersionMajor(Relation rel, bool force) +{ + ColumnarMetapage metapage = ColumnarMetapageRead(rel, force); + + return metapage.versionMajor; +} + + +/* + * ColumnarStorageGetVersionMinor - return minor version from the metapage. + * + * Throw an error if the metapage is not the current version, unless + * 'force' is true. + */ +uint64 +ColumnarStorageGetVersionMinor(Relation rel, bool force) +{ + ColumnarMetapage metapage = ColumnarMetapageRead(rel, force); + + return metapage.versionMinor; +} + + +/* + * ColumnarStorageGetStorageId - return storage ID from the metapage. + * + * Throw an error if the metapage is not the current version, unless + * 'force' is true. + */ +uint64 +ColumnarStorageGetStorageId(Relation rel, bool force) +{ + ColumnarMetapage metapage = ColumnarMetapageRead(rel, force); + + return metapage.storageId; +} + + +/* + * ColumnarStorageGetReservedStripeId - return reserved stripe ID from the + * metapage. + * + * Throw an error if the metapage is not the current version, unless + * 'force' is true. + */ +uint64 +ColumnarStorageGetReservedStripeId(Relation rel, bool force) +{ + ColumnarMetapage metapage = ColumnarMetapageRead(rel, force); + + return metapage.reservedStripeId; +} + + +/* + * ColumnarStorageGetReservedRowNumber - return reserved row number from the + * metapage. + * + * Throw an error if the metapage is not the current version, unless + * 'force' is true. + */ +uint64 +ColumnarStorageGetReservedRowNumber(Relation rel, bool force) +{ + ColumnarMetapage metapage = ColumnarMetapageRead(rel, force); + + return metapage.reservedRowNumber; +} + + +/* + * ColumnarStorageGetReservedOffset - return reserved offset from the metapage. + * + * Throw an error if the metapage is not the current version, unless + * 'force' is true. + */ +uint64 +ColumnarStorageGetReservedOffset(Relation rel, bool force) +{ + ColumnarMetapage metapage = ColumnarMetapageRead(rel, force); + + return metapage.reservedOffset; +} + + +/* + * ColumnarMetapageNeedsUpgrade - return true if metapage exists and is not + * the current version. + */ +bool +ColumnarStorageIsCurrent(Relation rel) +{ + RelationOpenSmgr(rel); + BlockNumber nblocks = smgrnblocks(rel->rd_smgr, MAIN_FORKNUM); + + if (nblocks < 2) + { + return false; + } + + ColumnarMetapage metapage = ColumnarMetapageRead(rel, true); + return ColumnarMetapageIsCurrent(&metapage); +} + + +/* + * ColumnarStorageReserveStripe - reserve stripe ID and row numbers. + */ +uint64 +ColumnarStorageReserveStripe(Relation rel, uint64 nrows, uint64 *firstRowNumber) +{ + LockRelationForExtension(rel, ExclusiveLock); + + ColumnarMetapage metapage = ColumnarMetapageRead(rel, false); + + uint64 stripeId = metapage.reservedStripeId; + metapage.reservedStripeId++; + + *firstRowNumber = metapage.reservedRowNumber; + metapage.reservedRowNumber += nrows; + + WriteToBlock(rel, COLUMNAR_METAPAGE_BLOCKNO, SizeOfPageHeaderData, + (char *) &metapage, sizeof(ColumnarMetapage), + true /* clear because we are overwriting */); + + UnlockRelationForExtension(rel, ExclusiveLock); + + return stripeId; +} + + +/* + * ColumnarStorageReserveData - reserve logical data offsets for writing. + */ +uint64 +ColumnarStorageReserveData(Relation rel, uint64 amount) +{ + if (amount == 0) + { + return ColumnarInvalidLogicalOffset; + } + + LockRelationForExtension(rel, ExclusiveLock); + + ColumnarMetapage metapage = ColumnarMetapageRead(rel, false); + + uint64 alignedReservation = AlignReservation(metapage.reservedOffset); + uint64 nextReservation = alignedReservation + amount; + metapage.reservedOffset = nextReservation; + + /* write new reservation */ + WriteToBlock(rel, COLUMNAR_METAPAGE_BLOCKNO, SizeOfPageHeaderData, + (char *) &metapage, sizeof(ColumnarMetapage), + true /* clear because we are overwriting */); + + /* last used PhysicalAddr of new reservation */ + PhysicalAddr final = LogicalToPhysical(nextReservation - 1); + + /* extend with new pages */ + RelationOpenSmgr(rel); + BlockNumber nblocks = smgrnblocks(rel->rd_smgr, MAIN_FORKNUM); + + while (nblocks <= final.blockno) + { + Buffer newBuffer = ReadBuffer(rel, P_NEW); + Assert(BufferGetBlockNumber(newBuffer) == nblocks); + ReleaseBuffer(newBuffer); + nblocks++; + } + + UnlockRelationForExtension(rel, ExclusiveLock); + + return alignedReservation; +} + + +/* + * ColumnarStorageRead - map the logical offset to a block and offset, then + * read the buffer from multiple blocks if necessary. + */ +void +ColumnarStorageRead(Relation rel, uint64 logicalOffset, char *data, uint32 amount) +{ + /* if there's no work to do, succeed even with invalid offset */ + if (amount == 0) + { + return; + } + + if (!ColumnarLogicalOffsetIsValid(logicalOffset)) + { + elog(ERROR, + "attempted columnar read on relation %d from invalid logical offset: " + UINT64_FORMAT, + rel->rd_id, logicalOffset); + } + + uint64 read = 0; + + while (read < amount) + { + PhysicalAddr addr = LogicalToPhysical(logicalOffset + read); + + uint32 to_read = Min(amount - read, BLCKSZ - addr.offset); + ReadFromBlock(rel, addr.blockno, addr.offset, data + read, to_read, + false); + + read += to_read; + } +} + + +/* + * ColumnarStorageWrite - map the logical offset to a block and offset, then + * write the buffer across multiple blocks if necessary. + */ +void +ColumnarStorageWrite(Relation rel, uint64 logicalOffset, char *data, uint32 amount) +{ + /* if there's no work to do, succeed even with invalid offset */ + if (amount == 0) + { + return; + } + + if (!ColumnarLogicalOffsetIsValid(logicalOffset)) + { + elog(ERROR, + "attempted columnar write on relation %d to invalid logical offset: " + UINT64_FORMAT, + rel->rd_id, logicalOffset); + } + + uint64 written = 0; + + while (written < amount) + { + PhysicalAddr addr = LogicalToPhysical(logicalOffset + written); + + uint64 to_write = Min(amount - written, BLCKSZ - addr.offset); + WriteToBlock(rel, addr.blockno, addr.offset, data + written, to_write, + false); + + written += to_write; + } +} + + +/* + * ColumnarStorageTruncate - truncate the columnar storage such that + * newDataReservation will be the first unused logical offset available. Free + * pages at the end of the relation. + * + * Caller must hold AccessExclusiveLock on the relation. + * + * Returns true if pages were truncated; false otherwise. + */ +bool +ColumnarStorageTruncate(Relation rel, uint64 newDataReservation) +{ + if (!ColumnarLogicalOffsetIsValid(newDataReservation)) + { + elog(ERROR, + "attempted to truncate relation %d to invalid logical offset: " UINT64_FORMAT, + rel->rd_id, newDataReservation); + } + + RelationOpenSmgr(rel); + BlockNumber old_rel_pages = smgrnblocks(rel->rd_smgr, MAIN_FORKNUM); + if (old_rel_pages == 0) + { + /* nothing to do */ + return false; + } + + LockRelationForExtension(rel, ExclusiveLock); + + ColumnarMetapage metapage = ColumnarMetapageRead(rel, false); + + if (metapage.reservedOffset < newDataReservation) + { + elog(ERROR, + "attempted to truncate relation %d to offset " UINT64_FORMAT \ + " which is higher than existing offset " UINT64_FORMAT, + rel->rd_id, newDataReservation, metapage.reservedOffset); + } + + if (metapage.reservedOffset == newDataReservation) + { + /* nothing to do */ + UnlockRelationForExtension(rel, ExclusiveLock); + return false; + } + + metapage.reservedOffset = newDataReservation; + + /* write new reservation */ + WriteToBlock(rel, COLUMNAR_METAPAGE_BLOCKNO, SizeOfPageHeaderData, + (char *) &metapage, sizeof(ColumnarMetapage), + true /* clear because we are overwriting */); + + UnlockRelationForExtension(rel, ExclusiveLock); + + PhysicalAddr final = LogicalToPhysical(newDataReservation - 1); + BlockNumber new_rel_pages = final.blockno + 1; + Assert(new_rel_pages <= old_rel_pages); + + /* + * Truncate the storage. Note that RelationTruncate() takes care of + * Write Ahead Logging. + */ + if (new_rel_pages < old_rel_pages) + { + RelationTruncate(rel, new_rel_pages); + return true; + } + + return false; +} + + +/* + * ColumnarMetapageRead - read the current contents of the metapage. Error if + * it does not exist. Throw an error if the metapage is not the current + * version, unless 'force' is true. + * + * NB: it's safe to read a different version of a metapage because we + * guarantee that fields will only be added and existing fields will never be + * changed. However, it's important that we don't depend on new fields being + * set properly when we read an old metapage; an old metapage should only be + * read for the purposes of upgrading or error checking. + */ +static ColumnarMetapage +ColumnarMetapageRead(Relation rel, bool force) +{ + RelationOpenSmgr(rel); + BlockNumber nblocks = smgrnblocks(rel->rd_smgr, MAIN_FORKNUM); + if (nblocks == 0) + { + elog(ERROR, "columnar metapage for relation \"%s\" does not exist", + RelationGetRelationName(rel)); + } + + ColumnarMetapage metapage; + ReadFromBlock(rel, COLUMNAR_METAPAGE_BLOCKNO, SizeOfPageHeaderData, + (char *) &metapage, sizeof(ColumnarMetapage), force); + + if (!force) + { + ColumnarMetapageCheckVersion(rel, &metapage); + } + + return metapage; +} + + +/* + * ReadFromBlock - read bytes from a page at the given offset. If 'force' is + * true, don't check pd_lower; useful when reading a metapage of unknown + * version. + */ +static void +ReadFromBlock(Relation rel, BlockNumber blockno, uint32 offset, char *buf, + uint32 len, bool force) +{ + Buffer buffer = ReadBuffer(rel, blockno); + Page page = BufferGetPage(buffer); + PageHeader phdr = (PageHeader) page; + + if (BLCKSZ < offset + len || (!force && (phdr->pd_lower < offset + len))) + { + elog(ERROR, + "attempt to read columnar data of length %d from offset %d of block %d of relation %d", + len, offset, blockno, rel->rd_id); + } + + memcpy_s(buf, len, page + offset, len); + ReleaseBuffer(buffer); +} + + +/* + * WriteToBlock - append data to a block, initializing if necessary, and emit + * WAL. If 'clear' is true, always clear the data on the page and reinitialize + * it first, and offset must be SizeOfPageHeaderData. Otherwise, offset must + * be equal to pd_lower and pd_lower will be set to the end of the written + * data. + */ +static void +WriteToBlock(Relation rel, BlockNumber blockno, uint32 offset, char *buf, + uint32 len, bool clear) +{ + Buffer buffer = ReadBuffer(rel, blockno); + LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE); + + Page page = BufferGetPage(buffer); + PageHeader phdr = (PageHeader) page; + if (PageIsNew(page) || clear) + { + PageInit(page, BLCKSZ, 0); + } + + if (phdr->pd_lower != offset || phdr->pd_upper - offset < len) + { + elog(ERROR, + "attempt to write columnar data of length %d to offset %d of block %d of relation %d", + len, offset, blockno, rel->rd_id); + } + + START_CRIT_SECTION(); + + memcpy_s(page + phdr->pd_lower, phdr->pd_upper - phdr->pd_lower, buf, len); + phdr->pd_lower += len; + + MarkBufferDirty(buffer); + + if (RelationNeedsWAL(rel)) + { + XLogBeginInsert(); + + /* + * Since columnar will mostly write whole pages we force the transmission of the + * whole image in the buffer + */ + XLogRegisterBuffer(0, buffer, REGBUF_FORCE_IMAGE); + + XLogRecPtr recptr = XLogInsert(RM_GENERIC_ID, 0); + PageSetLSN(page, recptr); + } + + END_CRIT_SECTION(); + + UnlockReleaseBuffer(buffer); +} + + +/* + * AlignReservation - given an unused logical byte offset, align it so that it + * falls at the start of a page. + * + * XXX: Reconsider whether we want/need to do this at all. + */ +static uint64 +AlignReservation(uint64 prevReservation) +{ + PhysicalAddr prevAddr = LogicalToPhysical(prevReservation); + uint64 alignedReservation = prevReservation; + + if (prevAddr.offset != SizeOfPageHeaderData) + { + /* not aligned; align on beginning of next page */ + PhysicalAddr initial = { 0 }; + initial.blockno = prevAddr.blockno + 1; + initial.offset = SizeOfPageHeaderData; + alignedReservation = PhysicalToLogical(initial); + } + + Assert(alignedReservation >= prevReservation); + return alignedReservation; +} + + +/* + * ColumnarMetapageIsCurrent - is the metapage at the latest version? + */ +static bool +ColumnarMetapageIsCurrent(ColumnarMetapage *metapage) +{ + return (metapage->versionMajor == COLUMNAR_VERSION_MAJOR && + metapage->versionMinor == COLUMNAR_VERSION_MINOR); +} + + +/* + * ColumnarMetapageIsOlder - is the metapage older than the current version? + */ +static bool +ColumnarMetapageIsOlder(ColumnarMetapage *metapage) +{ + return (metapage->versionMajor < COLUMNAR_VERSION_MAJOR || + (metapage->versionMajor == COLUMNAR_VERSION_MAJOR && + (int) metapage->versionMinor < (int) COLUMNAR_VERSION_MINOR)); +} + + +/* + * ColumnarMetapageIsNewer - is the metapage newer than the current version? + */ +static bool +ColumnarMetapageIsNewer(ColumnarMetapage *metapage) +{ + return (metapage->versionMajor > COLUMNAR_VERSION_MAJOR || + (metapage->versionMajor == COLUMNAR_VERSION_MAJOR && + metapage->versionMinor > COLUMNAR_VERSION_MINOR)); +} + + +/* + * ColumnarMetapageCheckVersion - throw an error if accessing old + * version of metapage. + */ +static void +ColumnarMetapageCheckVersion(Relation rel, ColumnarMetapage *metapage) +{ + if (!ColumnarMetapageIsCurrent(metapage)) + { + ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg( + "attempted to access relation \"%s\", which uses an older columnar format", + RelationGetRelationName(rel)), + errdetail( + "Columnar format version %d.%d is required, \"%s\" has version %d.%d.", + COLUMNAR_VERSION_MAJOR, COLUMNAR_VERSION_MINOR, + RelationGetRelationName(rel), + metapage->versionMajor, metapage->versionMinor), + errhint( + "Use VACUUM to upgrade the columnar table format version."))); + } +} diff --git a/src/backend/columnar/columnar_tableam.c b/src/backend/columnar/columnar_tableam.c index 91b55191f..0614cb379 100644 --- a/src/backend/columnar/columnar_tableam.c +++ b/src/backend/columnar/columnar_tableam.c @@ -51,6 +51,7 @@ #include "columnar/columnar.h" #include "columnar/columnar_customscan.h" +#include "columnar/columnar_storage.h" #include "columnar/columnar_tableam.h" #include "columnar/columnar_version_compat.h" #include "distributed/commands.h" @@ -516,17 +517,24 @@ columnar_relation_set_new_filenode(Relation rel, errmsg("unlogged columnar tables are not supported"))); } - Oid oldRelfilenode = rel->rd_node.relNode; + /* + * If existing and new relfilenode are different, that means the existing + * storage was dropped and we also need to clean up the metadata and + * state. If they are equal, this is a new relation object and we don't + * need to clean anything. + */ + if (rel->rd_node.relNode != newrnode->relNode) + { + MarkRelfilenodeDropped(rel->rd_node.relNode, GetCurrentSubTransactionId()); - MarkRelfilenodeDropped(oldRelfilenode, GetCurrentSubTransactionId()); - - /* delete old relfilenode metadata */ - DeleteMetadataRows(rel->rd_node); + DeleteMetadataRows(rel->rd_node); + } *freezeXid = RecentXmin; *minmulti = GetOldestMultiXactId(); SMgrRelation srel = RelationCreateStorage(*newrnode, persistence); + ColumnarStorageInit(srel, ColumnarMetadataNewStorageId()); InitColumnarOptions(rel->rd_id); smgrclose(srel); @@ -554,7 +562,9 @@ columnar_relation_nontransactional_truncate(Relation rel) */ RelationTruncate(rel, 0); - /* we will lazily initialize new metadata in first stripe reservation */ + uint64 storageId = ColumnarMetadataNewStorageId(); + RelationOpenSmgr(rel); + ColumnarStorageInit(rel->rd_smgr, storageId); } @@ -840,34 +850,25 @@ TruncateColumnar(Relation rel, int elevel) return; } - RelationOpenSmgr(rel); - BlockNumber old_rel_pages = smgrnblocks(rel->rd_smgr, MAIN_FORKNUM); - RelationCloseSmgr(rel); - /* * Due to the AccessExclusive lock there's no danger that * new stripes be added beyond highestPhysicalAddress while * we're truncating. */ - SmgrAddr highestPhysicalAddress = - logical_to_smgr(GetHighestUsedAddress(rel->rd_node)); + uint64 newDataReservation = Max(GetHighestUsedAddress(rel->rd_node) + 1, + ColumnarFirstLogicalOffset); - /* - * Unlock and return if truncation won't reduce data file's size. - */ - BlockNumber new_rel_pages = Min(old_rel_pages, - highestPhysicalAddress.blockno + 1); - if (new_rel_pages == old_rel_pages) + RelationOpenSmgr(rel); + BlockNumber old_rel_pages = smgrnblocks(rel->rd_smgr, MAIN_FORKNUM); + + if (!ColumnarStorageTruncate(rel, newDataReservation)) { UnlockRelation(rel, AccessExclusiveLock); return; } - /* - * Truncate the storage. Note that RelationTruncate() takes care of - * Write Ahead Logging. - */ - RelationTruncate(rel, new_rel_pages); + RelationOpenSmgr(rel); + BlockNumber new_rel_pages = smgrnblocks(rel->rd_smgr, MAIN_FORKNUM); /* * We can release the exclusive lock as soon as we have truncated. @@ -1822,3 +1823,75 @@ alter_columnar_table_reset(PG_FUNCTION_ARGS) PG_RETURN_VOID(); } + + +/* + * upgrade_columnar_storage - upgrade columnar storage to the current + * version. + * + * DDL: + * CREATE OR REPLACE FUNCTION upgrade_columnar_storage(rel regclass) + * RETURNS VOID + * STRICT + * LANGUAGE c AS 'MODULE_PATHNAME', 'upgrade_columnar_storage'; + */ +PG_FUNCTION_INFO_V1(upgrade_columnar_storage); +Datum +upgrade_columnar_storage(PG_FUNCTION_ARGS) +{ + Oid relid = PG_GETARG_OID(0); + + /* + * ACCESS EXCLUSIVE LOCK is not required by the low-level routines, so we + * can take only an ACCESS SHARE LOCK. But all access to non-current + * columnar tables will fail anyway, so it's better to take ACCESS + * EXLUSIVE LOCK now. + */ + Relation rel = table_open(relid, AccessExclusiveLock); + if (!IsColumnarTableAmTable(relid)) + { + ereport(ERROR, (errmsg("table %s is not a columnar table", + quote_identifier(RelationGetRelationName(rel))))); + } + + ColumnarStorageUpdateIfNeeded(rel, true); + + table_close(rel, AccessExclusiveLock); + PG_RETURN_VOID(); +} + + +/* + * downgrade_columnar_storage - downgrade columnar storage to the + * current version. + * + * DDL: + * CREATE OR REPLACE FUNCTION downgrade_columnar_storage(rel regclass) + * RETURNS VOID + * STRICT + * LANGUAGE c AS 'MODULE_PATHNAME', 'downgrade_columnar_storage'; + */ +PG_FUNCTION_INFO_V1(downgrade_columnar_storage); +Datum +downgrade_columnar_storage(PG_FUNCTION_ARGS) +{ + Oid relid = PG_GETARG_OID(0); + + /* + * ACCESS EXCLUSIVE LOCK is not required by the low-level routines, so we + * can take only an ACCESS SHARE LOCK. But all access to non-current + * columnar tables will fail anyway, so it's better to take ACCESS + * EXLUSIVE LOCK now. + */ + Relation rel = table_open(relid, AccessExclusiveLock); + if (!IsColumnarTableAmTable(relid)) + { + ereport(ERROR, (errmsg("table %s is not a columnar table", + quote_identifier(RelationGetRelationName(rel))))); + } + + ColumnarStorageUpdateIfNeeded(rel, false); + + table_close(rel, AccessExclusiveLock); + PG_RETURN_VOID(); +} diff --git a/src/backend/columnar/columnar_writer.c b/src/backend/columnar/columnar_writer.c index 386c4781d..44c6dfad2 100644 --- a/src/backend/columnar/columnar_writer.c +++ b/src/backend/columnar/columnar_writer.c @@ -30,6 +30,7 @@ #include "utils/relfilenodemap.h" #include "columnar/columnar.h" +#include "columnar/columnar_storage.h" #include "columnar/columnar_version_compat.h" struct ColumnarWriteState @@ -351,80 +352,6 @@ CreateEmptyStripeSkipList(uint32 stripeMaxRowCount, uint32 chunkRowCount, } -void -WriteToSmgr(Relation rel, uint64 logicalOffset, char *data, uint32 dataLength) -{ - uint64 remaining = dataLength; - Buffer buffer; - - while (remaining > 0) - { - SmgrAddr addr = logical_to_smgr(logicalOffset); - - RelationOpenSmgr(rel); - BlockNumber nblocks PG_USED_FOR_ASSERTS_ONLY = - smgrnblocks(rel->rd_smgr, MAIN_FORKNUM); - Assert(addr.blockno < nblocks); - RelationCloseSmgr(rel); - - buffer = ReadBuffer(rel, addr.blockno); - LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE); - - Page page = BufferGetPage(buffer); - PageHeader phdr = (PageHeader) page; - if (PageIsNew(page)) - { - PageInit(page, BLCKSZ, 0); - } - - /* - * After a transaction has been rolled-back, we might be - * over-writing the rolledback write, so phdr->pd_lower can be - * different from addr.offset. - * - * We reset pd_lower to reset the rolledback write. - */ - if (phdr->pd_lower > addr.offset) - { - ereport(DEBUG1, (errmsg("over-writing page %u", addr.blockno), - errdetail("This can happen after a roll-back."))); - phdr->pd_lower = addr.offset; - } - Assert(phdr->pd_lower == addr.offset); - - START_CRIT_SECTION(); - - uint64 to_write = Min(phdr->pd_upper - phdr->pd_lower, remaining); - memcpy_s(page + phdr->pd_lower, phdr->pd_upper - phdr->pd_lower, data, to_write); - phdr->pd_lower += to_write; - - MarkBufferDirty(buffer); - - if (RelationNeedsWAL(rel)) - { - XLogBeginInsert(); - - /* - * Since columnar will mostly write whole pages we force the transmission of the - * whole image in the buffer - */ - XLogRegisterBuffer(0, buffer, REGBUF_FORCE_IMAGE); - - XLogRecPtr recptr = XLogInsert(RM_GENERIC_ID, 0); - PageSetLSN(page, recptr); - } - - END_CRIT_SECTION(); - - UnlockReleaseBuffer(buffer); - - data += to_write; - remaining -= to_write; - logicalOffset += to_write; - } -} - - /* * FlushStripe flushes current stripe data into the file. The function first ensures * the last data chunk for each column is properly serialized and compressed. Then, @@ -527,8 +454,8 @@ FlushStripe(ColumnarWriteState *writeState) columnBuffers->chunkBuffersArray[chunkIndex]; StringInfo existsBuffer = chunkBuffers->existsBuffer; - WriteToSmgr(relation, currentFileOffset, - existsBuffer->data, existsBuffer->len); + ColumnarStorageWrite(relation, currentFileOffset, + existsBuffer->data, existsBuffer->len); currentFileOffset += existsBuffer->len; } @@ -538,8 +465,8 @@ FlushStripe(ColumnarWriteState *writeState) columnBuffers->chunkBuffersArray[chunkIndex]; StringInfo valueBuffer = chunkBuffers->valueBuffer; - WriteToSmgr(relation, currentFileOffset, - valueBuffer->data, valueBuffer->len); + ColumnarStorageWrite(relation, currentFileOffset, + valueBuffer->data, valueBuffer->len); currentFileOffset += valueBuffer->len; } } diff --git a/src/backend/columnar/sql/columnar--10.0-3--10.1-1.sql b/src/backend/columnar/sql/columnar--10.0-3--10.1-1.sql index 4cd1add73..d209e3b95 100644 --- a/src/backend/columnar/sql/columnar--10.0-3--10.1-1.sql +++ b/src/backend/columnar/sql/columnar--10.0-3--10.1-1.sql @@ -20,3 +20,10 @@ END$proc$; -- since we dropped pg11 support, we don't need to worry about missing -- columnar objects when upgrading postgres DROP FUNCTION citus_internal.columnar_ensure_objects_exist(); + +#include "udfs/upgrade_columnar_storage/10.1-1.sql" +#include "udfs/downgrade_columnar_storage/10.1-1.sql" + +-- upgrade storage for all columnar relations +SELECT citus_internal.upgrade_columnar_storage(c.oid) FROM pg_class c, pg_am a + WHERE c.relam = a.oid AND amname = 'columnar'; diff --git a/src/backend/columnar/sql/downgrades/columnar--10.1-1--10.0-3.sql b/src/backend/columnar/sql/downgrades/columnar--10.1-1--10.0-3.sql index 46e6e08f7..1a56238ad 100644 --- a/src/backend/columnar/sql/downgrades/columnar--10.1-1--10.0-3.sql +++ b/src/backend/columnar/sql/downgrades/columnar--10.1-1--10.0-3.sql @@ -11,3 +11,10 @@ REFERENCES columnar.stripe(storage_id, stripe_num) ON DELETE CASCADE; -- define columnar_ensure_objects_exist again #include "../udfs/columnar_ensure_objects_exist/10.0-1.sql" + +-- upgrade storage for all columnar relations +SELECT citus_internal.downgrade_columnar_storage(c.oid) FROM pg_class c, pg_am a + WHERE c.relam = a.oid AND amname = 'columnar'; + +DROP FUNCTION citus_internal.upgrade_columnar_storage(regclass); +DROP FUNCTION citus_internal.downgrade_columnar_storage(regclass); diff --git a/src/backend/columnar/sql/udfs/downgrade_columnar_storage/10.1-1.sql b/src/backend/columnar/sql/udfs/downgrade_columnar_storage/10.1-1.sql new file mode 100644 index 000000000..b41549d3d --- /dev/null +++ b/src/backend/columnar/sql/udfs/downgrade_columnar_storage/10.1-1.sql @@ -0,0 +1,7 @@ +CREATE OR REPLACE FUNCTION citus_internal.downgrade_columnar_storage(rel regclass) + RETURNS VOID + STRICT + LANGUAGE c AS 'MODULE_PATHNAME', $$downgrade_columnar_storage$$; + +COMMENT ON FUNCTION citus_internal.downgrade_columnar_storage(regclass) + IS 'function to downgrade the columnar storage, if necessary'; diff --git a/src/backend/columnar/sql/udfs/downgrade_columnar_storage/latest.sql b/src/backend/columnar/sql/udfs/downgrade_columnar_storage/latest.sql new file mode 100644 index 000000000..b41549d3d --- /dev/null +++ b/src/backend/columnar/sql/udfs/downgrade_columnar_storage/latest.sql @@ -0,0 +1,7 @@ +CREATE OR REPLACE FUNCTION citus_internal.downgrade_columnar_storage(rel regclass) + RETURNS VOID + STRICT + LANGUAGE c AS 'MODULE_PATHNAME', $$downgrade_columnar_storage$$; + +COMMENT ON FUNCTION citus_internal.downgrade_columnar_storage(regclass) + IS 'function to downgrade the columnar storage, if necessary'; diff --git a/src/backend/columnar/sql/udfs/upgrade_columnar_storage/10.1-1.sql b/src/backend/columnar/sql/udfs/upgrade_columnar_storage/10.1-1.sql new file mode 100644 index 000000000..a82bcc758 --- /dev/null +++ b/src/backend/columnar/sql/udfs/upgrade_columnar_storage/10.1-1.sql @@ -0,0 +1,7 @@ +CREATE OR REPLACE FUNCTION citus_internal.upgrade_columnar_storage(rel regclass) + RETURNS VOID + STRICT + LANGUAGE c AS 'MODULE_PATHNAME', $$upgrade_columnar_storage$$; + +COMMENT ON FUNCTION citus_internal.upgrade_columnar_storage(regclass) + IS 'function to upgrade the columnar storage, if necessary'; diff --git a/src/backend/columnar/sql/udfs/upgrade_columnar_storage/latest.sql b/src/backend/columnar/sql/udfs/upgrade_columnar_storage/latest.sql new file mode 100644 index 000000000..a82bcc758 --- /dev/null +++ b/src/backend/columnar/sql/udfs/upgrade_columnar_storage/latest.sql @@ -0,0 +1,7 @@ +CREATE OR REPLACE FUNCTION citus_internal.upgrade_columnar_storage(rel regclass) + RETURNS VOID + STRICT + LANGUAGE c AS 'MODULE_PATHNAME', $$upgrade_columnar_storage$$; + +COMMENT ON FUNCTION citus_internal.upgrade_columnar_storage(regclass) + IS 'function to upgrade the columnar storage, if necessary'; diff --git a/src/backend/distributed/transaction/lock_graph.c b/src/backend/distributed/transaction/lock_graph.c index 8bd2150a4..aed021ae0 100644 --- a/src/backend/distributed/transaction/lock_graph.c +++ b/src/backend/distributed/transaction/lock_graph.c @@ -25,7 +25,6 @@ #include "distributed/lock_graph.h" #include "distributed/metadata_cache.h" #include "distributed/remote_commands.h" -#include "distributed/resource_lock.h" #include "distributed/tuplestore.h" #include "storage/proc.h" #include "utils/builtins.h" @@ -472,18 +471,9 @@ IsProcessWaitingForSafeOperations(PGPROC *proc) PROCLOCK *waitProcLock = proc->waitProcLock; LOCK *waitLock = waitProcLock->tag.myLock; - /* - * Stripe reservation locks are temporary & don't hold until end of - * transaction, so we shouldn't include them in the lock graph. - */ - bool stripeReservationLock = - waitLock->tag.locktag_type == LOCKTAG_ADVISORY && - waitLock->tag.locktag_field4 == ADV_LOCKTAG_CLASS_COLUMNAR_STRIPE_RESERVATION; - return waitLock->tag.locktag_type == LOCKTAG_RELATION_EXTEND || waitLock->tag.locktag_type == LOCKTAG_PAGE || - waitLock->tag.locktag_type == LOCKTAG_SPECULATIVE_TOKEN || - stripeReservationLock; + waitLock->tag.locktag_type == LOCKTAG_SPECULATIVE_TOKEN; } diff --git a/src/include/columnar/columnar.h b/src/include/columnar/columnar.h index e6101cf18..78c880593 100644 --- a/src/include/columnar/columnar.h +++ b/src/include/columnar/columnar.h @@ -39,8 +39,8 @@ #define COMPRESSION_LEVEL_MAX 19 /* Columnar file signature */ -#define COLUMNAR_VERSION_MAJOR 1 -#define COLUMNAR_VERSION_MINOR 7 +#define COLUMNAR_VERSION_MAJOR 2 +#define COLUMNAR_VERSION_MINOR 0 /* miscellaneous defines */ #define COLUMNAR_TUPLE_COST_MULTIPLIER 10 @@ -233,13 +233,11 @@ extern void InitColumnarOptions(Oid regclass); extern void SetColumnarOptions(Oid regclass, ColumnarOptions *options); extern bool DeleteColumnarTableOptions(Oid regclass, bool missingOk); extern bool ReadColumnarOptions(Oid regclass, ColumnarOptions *options); -extern void WriteToSmgr(Relation relation, uint64 logicalOffset, - char *data, uint32 dataLength); -extern StringInfo ReadFromSmgr(Relation rel, uint64 offset, uint32 size); extern bool IsColumnarTableAmTable(Oid relationId); /* columnar_metadata_tables.c */ extern void DeleteMetadataRows(RelFileNode relfilenode); +extern uint64 ColumnarMetadataNewStorageId(void); extern uint64 GetHighestUsedAddress(RelFileNode relfilenode); extern StripeMetadata ReserveStripe(Relation rel, uint64 size, uint64 rowCount, uint64 columnCount, @@ -271,51 +269,5 @@ extern bool PendingWritesInUpperTransactions(Oid relfilenode, SubTransactionId currentSubXid); extern MemoryContext GetWriteContextForDebug(void); -typedef struct SmgrAddr -{ - BlockNumber blockno; - uint32 offset; -} SmgrAddr; - -/* - * Map logical offsets (as tracked in the metadata) to a physical page and - * offset where the data is kept. - */ -static inline SmgrAddr -logical_to_smgr(uint64 logicalOffset) -{ - SmgrAddr addr; - - addr.blockno = logicalOffset / COLUMNAR_BYTES_PER_PAGE; - addr.offset = SizeOfPageHeaderData + (logicalOffset % COLUMNAR_BYTES_PER_PAGE); - - return addr; -} - - -/* - * Map a physical page adnd offset address to a logical address. - */ -static inline uint64 -smgr_to_logical(SmgrAddr addr) -{ - return COLUMNAR_BYTES_PER_PAGE * addr.blockno + addr.offset - SizeOfPageHeaderData; -} - - -/* - * Get the first usable address of next block. - */ -static inline SmgrAddr -next_block_start(SmgrAddr addr) -{ - SmgrAddr result = { - .blockno = addr.blockno + 1, - .offset = SizeOfPageHeaderData - }; - - return result; -} - #endif /* COLUMNAR_H */ diff --git a/src/include/columnar/columnar_metadata.h b/src/include/columnar/columnar_metadata.h index 26b3effcf..c041b56d6 100644 --- a/src/include/columnar/columnar_metadata.h +++ b/src/include/columnar/columnar_metadata.h @@ -28,5 +28,6 @@ typedef struct StripeMetadata } StripeMetadata; extern List * StripesForRelfilenode(RelFileNode relfilenode); +extern void ColumnarStorageUpdateIfNeeded(Relation rel, bool isUpgrade); #endif /* COLUMNAR_METADATA_H */ diff --git a/src/include/columnar/columnar_storage.h b/src/include/columnar/columnar_storage.h new file mode 100644 index 000000000..fe90b96ed --- /dev/null +++ b/src/include/columnar/columnar_storage.h @@ -0,0 +1,54 @@ +/*------------------------------------------------------------------------- + * + * columnar_storage.h + * + * Type and function declarations for storage of columnar data in blocks. + * + * Copyright (c) Citus Data, Inc. + * + *------------------------------------------------------------------------- + */ + +#ifndef COLUMNAR_STORAGE_H +#define COLUMNAR_STORAGE_H + +#include "postgres.h" + +#include "storage/smgr.h" +#include "utils/rel.h" + + +/* + * Logical offsets never fall on the first two physical pages. See + * comments in columnar_storage.c. + */ +#define ColumnarInvalidLogicalOffset 0 +#define ColumnarFirstLogicalOffset ((BLCKSZ - SizeOfPageHeaderData) * 2) +#define ColumnarLogicalOffsetIsValid(X) ((X) >= ColumnarFirstLogicalOffset) + + +extern void ColumnarStorageInit(SMgrRelation srel, uint64 storageId); +extern bool ColumnarStorageIsCurrent(Relation rel); +extern void ColumnarStorageUpdateCurrent(Relation rel, bool upgrade, + uint64 reservedStripeId, + uint64 reservedRowNumber, + uint64 reservedOffset); + +extern uint64 ColumnarStorageGetVersionMajor(Relation rel, bool force); +extern uint64 ColumnarStorageGetVersionMinor(Relation rel, bool force); +extern uint64 ColumnarStorageGetStorageId(Relation rel, bool force); +extern uint64 ColumnarStorageGetReservedStripeId(Relation rel, bool force); +extern uint64 ColumnarStorageGetReservedRowNumber(Relation rel, bool force); +extern uint64 ColumnarStorageGetReservedOffset(Relation rel, bool force); + +extern uint64 ColumnarStorageReserveData(Relation rel, uint64 amount); +extern uint64 ColumnarStorageReserveStripe(Relation rel, uint64 nrows, + uint64 *firstRowNumber); + +extern void ColumnarStorageRead(Relation rel, uint64 logicalOffset, + char *data, uint32 amount); +extern void ColumnarStorageWrite(Relation rel, uint64 logicalOffset, + char *data, uint32 amount); +extern bool ColumnarStorageTruncate(Relation rel, uint64 newDataReservation); + +#endif /* COLUMNAR_STORAGE_H */ diff --git a/src/include/distributed/resource_lock.h b/src/include/distributed/resource_lock.h index 5db9403b4..7241831dd 100644 --- a/src/include/distributed/resource_lock.h +++ b/src/include/distributed/resource_lock.h @@ -39,10 +39,7 @@ typedef enum AdvisoryLocktagClass ADV_LOCKTAG_CLASS_CITUS_REBALANCE_COLOCATION = 7, ADV_LOCKTAG_CLASS_CITUS_COLOCATED_SHARDS_METADATA = 8, ADV_LOCKTAG_CLASS_CITUS_OPERATIONS = 9, - ADV_LOCKTAG_CLASS_CITUS_PLACEMENT_CLEANUP = 10, - - /* Columnar lock types */ - ADV_LOCKTAG_CLASS_COLUMNAR_STRIPE_RESERVATION = 11 + ADV_LOCKTAG_CLASS_CITUS_PLACEMENT_CLEANUP = 10 } AdvisoryLocktagClass; /* CitusOperations has constants for citus operations */ @@ -102,13 +99,6 @@ typedef enum CitusOperations (uint32) operationId, \ ADV_LOCKTAG_CLASS_CITUS_OPERATIONS) -#define SET_LOCKTAG_COLUMNAR_STRIPE_RESERVATION(tag, relation) \ - SET_LOCKTAG_ADVISORY(tag, \ - relation->rd_lockInfo.lockRelId.dbId, \ - relation->rd_lockInfo.lockRelId.relId, \ - 0, \ - ADV_LOCKTAG_CLASS_COLUMNAR_STRIPE_RESERVATION) - /* reuse advisory lock, but with different, unused field 4 (10) * Also it has the database hardcoded to MyDatabaseId, to ensure the locks * are local to each database */ diff --git a/src/test/regress/after_citus_upgrade_coord_schedule b/src/test/regress/after_citus_upgrade_coord_schedule index 4610bf535..49b4e73d9 100644 --- a/src/test/regress/after_citus_upgrade_coord_schedule +++ b/src/test/regress/after_citus_upgrade_coord_schedule @@ -3,3 +3,4 @@ test: upgrade_basic_after test: upgrade_partition_constraints_after test: upgrade_pg_dist_object_test_after +test: upgrade_columnar_metapage_after diff --git a/src/test/regress/before_citus_upgrade_coord_schedule b/src/test/regress/before_citus_upgrade_coord_schedule index 9d015838b..006217e8a 100644 --- a/src/test/regress/before_citus_upgrade_coord_schedule +++ b/src/test/regress/before_citus_upgrade_coord_schedule @@ -3,3 +3,4 @@ test: upgrade_basic_before test: upgrade_partition_constraints_before test: upgrade_pg_dist_object_test_before +test: upgrade_columnar_metapage_before diff --git a/src/test/regress/expected/columnar_alter.out b/src/test/regress/expected/columnar_alter.out index 0c22f0046..0f65a84d4 100644 --- a/src/test/regress/expected/columnar_alter.out +++ b/src/test/regress/expected/columnar_alter.out @@ -12,6 +12,14 @@ WITH sample_data AS (VALUES INSERT INTO test_alter_table SELECT * FROM sample_data; -- drop a column ALTER TABLE test_alter_table DROP COLUMN a; +select + version_major, version_minor, reserved_stripe_id, reserved_row_number, reserved_offset + from columnar_test_helpers.columnar_storage_info('test_alter_table'); + version_major | version_minor | reserved_stripe_id | reserved_row_number | reserved_offset +--------------------------------------------------------------------- + 2 | 0 | 2 | 1 | 16402 +(1 row) + -- test analyze ANALYZE test_alter_table; -- verify select queries run as expected @@ -59,6 +67,14 @@ SELECT * FROM test_alter_table; 3 | 5 | 8 (5 rows) +select + version_major, version_minor, reserved_stripe_id, reserved_row_number, reserved_offset + from columnar_test_helpers.columnar_storage_info('test_alter_table'); + version_major | version_minor | reserved_stripe_id | reserved_row_number | reserved_offset +--------------------------------------------------------------------- + 2 | 0 | 4 | 1 | 32724 +(1 row) + -- add a fixed-length column with default value ALTER TABLE test_alter_table ADD COLUMN e int default 3; SELECT * from test_alter_table; @@ -83,6 +99,14 @@ SELECT * from test_alter_table; 1 | 2 | 4 | 8 (6 rows) +select + version_major, version_minor, reserved_stripe_id, reserved_row_number, reserved_offset + from columnar_test_helpers.columnar_storage_info('test_alter_table'); + version_major | version_minor | reserved_stripe_id | reserved_row_number | reserved_offset +--------------------------------------------------------------------- + 2 | 0 | 5 | 1 | 40906 +(1 row) + -- add a variable-length column with default value ALTER TABLE test_alter_table ADD COLUMN f text DEFAULT 'TEXT ME'; SELECT * from test_alter_table; diff --git a/src/test/regress/expected/columnar_create.out b/src/test/regress/expected/columnar_create.out index 5630ee811..772d6ac64 100644 --- a/src/test/regress/expected/columnar_create.out +++ b/src/test/regress/expected/columnar_create.out @@ -5,11 +5,16 @@ CREATE TABLE contestant (handle TEXT, birthdate DATE, rating INT, percentile FLOAT, country CHAR(3), achievements TEXT[]) USING columnar; +SELECT alter_columnar_table_set('contestant', compression => 'none'); + alter_columnar_table_set +--------------------------------------------------------------------- + +(1 row) + -- should fail CREATE INDEX contestant_idx on contestant(handle); ERROR: indexes not supported for columnar tables --- Create compressed table with automatically determined file path --- COMPRESSED +-- Create zstd compressed table CREATE TABLE contestant_compressed (handle TEXT, birthdate DATE, rating INT, percentile FLOAT, country CHAR(3), achievements TEXT[]) USING columnar; diff --git a/src/test/regress/expected/columnar_empty.out b/src/test/regress/expected/columnar_empty.out index 6a5b2d15d..67bde4e0a 100644 --- a/src/test/regress/expected/columnar_empty.out +++ b/src/test/regress/expected/columnar_empty.out @@ -26,7 +26,7 @@ SELECT alter_columnar_table_set('t_compressed', chunk_group_row_limit => 100); SELECT * FROM columnar.options WHERE regclass = 't_compressed'::regclass; regclass | chunk_group_row_limit | stripe_row_limit | compression_level | compression --------------------------------------------------------------------- - t_compressed | 100 | 100 | 3 | pglz + t_compressed | 100 | 100 | 3 | pglz (1 row) -- select @@ -52,6 +52,23 @@ select count(*) from t_compressed; 0 (1 row) +-- check storage +select + version_major, version_minor, reserved_stripe_id, reserved_row_number, reserved_offset + from columnar_test_helpers.columnar_storage_info('t_compressed'); + version_major | version_minor | reserved_stripe_id | reserved_row_number | reserved_offset +--------------------------------------------------------------------- + 2 | 0 | 1 | 1 | 16336 +(1 row) + +select + version_major, version_minor, reserved_stripe_id, reserved_row_number, reserved_offset + from columnar_test_helpers.columnar_storage_info('t_uncompressed'); + version_major | version_minor | reserved_stripe_id | reserved_row_number | reserved_offset +--------------------------------------------------------------------- + 2 | 0 | 1 | 1 | 16336 +(1 row) + -- explain explain (costs off, summary off, timing off) select * from t_uncompressed; QUERY PLAN @@ -68,16 +85,16 @@ explain (costs off, summary off, timing off) select * from t_compressed; -- vacuum vacuum verbose t_compressed; INFO: statistics for "t_compressed": -storage id: -1 -total file size: 0, total data size: 0 +storage id: xxxxx +total file size: 16384, total data size: 0 compression rate: 1.00x total row count: 0, stripe count: 0, average rows per stripe: 0 chunk count: 0, containing data for dropped columns: 0 vacuum verbose t_uncompressed; INFO: statistics for "t_uncompressed": -storage id: -1 -total file size: 0, total data size: 0 +storage id: xxxxx +total file size: 16384, total data size: 0 compression rate: 1.00x total row count: 0, stripe count: 0, average rows per stripe: 0 chunk count: 0, containing data for dropped columns: 0 @@ -85,6 +102,23 @@ chunk count: 0, containing data for dropped columns: 0 -- vacuum full vacuum full t_compressed; vacuum full t_uncompressed; +-- check storage +select + version_major, version_minor, reserved_stripe_id, reserved_row_number, reserved_offset + from columnar_test_helpers.columnar_storage_info('t_compressed'); + version_major | version_minor | reserved_stripe_id | reserved_row_number | reserved_offset +--------------------------------------------------------------------- + 2 | 0 | 1 | 1 | 16336 +(1 row) + +select + version_major, version_minor, reserved_stripe_id, reserved_row_number, reserved_offset + from columnar_test_helpers.columnar_storage_info('t_uncompressed'); + version_major | version_minor | reserved_stripe_id | reserved_row_number | reserved_offset +--------------------------------------------------------------------- + 2 | 0 | 1 | 1 | 16336 +(1 row) + -- analyze analyze t_uncompressed; analyze t_compressed; @@ -94,6 +128,23 @@ truncate t_compressed; -- alter type alter table t_uncompressed alter column a type text; alter table t_compressed alter column a type text; +-- check storage +select + version_major, version_minor, reserved_stripe_id, reserved_row_number, reserved_offset + from columnar_test_helpers.columnar_storage_info('t_compressed'); + version_major | version_minor | reserved_stripe_id | reserved_row_number | reserved_offset +--------------------------------------------------------------------- + 2 | 0 | 1 | 1 | 16336 +(1 row) + +select + version_major, version_minor, reserved_stripe_id, reserved_row_number, reserved_offset + from columnar_test_helpers.columnar_storage_info('t_uncompressed'); + version_major | version_minor | reserved_stripe_id | reserved_row_number | reserved_offset +--------------------------------------------------------------------- + 2 | 0 | 1 | 1 | 16336 +(1 row) + -- verify cost of scanning an empty table is zero, not NaN explain table t_uncompressed; QUERY PLAN diff --git a/src/test/regress/expected/columnar_insert.out b/src/test/regress/expected/columnar_insert.out index 24a7b6de2..d243dda4b 100644 --- a/src/test/regress/expected/columnar_insert.out +++ b/src/test/regress/expected/columnar_insert.out @@ -45,6 +45,14 @@ select count(*) from test_insert_command; 3 (1 row) +select + version_major, version_minor, reserved_stripe_id, reserved_row_number, reserved_offset + from columnar_test_helpers.columnar_storage_info('test_insert_command'); + version_major | version_minor | reserved_stripe_id | reserved_row_number | reserved_offset +--------------------------------------------------------------------- + 2 | 0 | 4 | 1 | 32686 +(1 row) + SELECT * FROM columnar_test_helpers.chunk_group_consistency; consistent --------------------------------------------------------------------- @@ -141,6 +149,14 @@ FROM test_toast_columnar; 5004 | 5004 | 5004 | 5004 (1 row) +select + version_major, version_minor, reserved_stripe_id, reserved_row_number, reserved_offset + from columnar_test_helpers.columnar_storage_info('test_toast_columnar'); + version_major | version_minor | reserved_stripe_id | reserved_row_number | reserved_offset +--------------------------------------------------------------------- + 2 | 0 | 2 | 1 | 16428 +(1 row) + SELECT * FROM columnar_test_helpers.chunk_group_consistency; consistent --------------------------------------------------------------------- @@ -173,6 +189,14 @@ INSERT INTO zero_col_heap SELECT * FROM zero_col_heap; INSERT INTO zero_col_heap SELECT * FROM zero_col_heap; INSERT INTO zero_col_heap SELECT * FROM zero_col_heap; INSERT INTO zero_col SELECT * FROM zero_col_heap; +select + version_major, version_minor, reserved_stripe_id, reserved_row_number, reserved_offset + from columnar_test_helpers.columnar_storage_info('zero_col'); + version_major | version_minor | reserved_stripe_id | reserved_row_number | reserved_offset +--------------------------------------------------------------------- + 2 | 0 | 6 | 1 | 16336 +(1 row) + SELECT relname, stripe_num, chunk_group_count, row_count FROM columnar.stripe a, pg_class b WHERE columnar_test_helpers.columnar_relation_storageid(b.oid)=a.storage_id AND relname = 'zero_col' ORDER BY 1,2,3,4; diff --git a/src/test/regress/expected/columnar_rollback.out b/src/test/regress/expected/columnar_rollback.out index 91c73bf63..89311e264 100644 --- a/src/test/regress/expected/columnar_rollback.out +++ b/src/test/regress/expected/columnar_rollback.out @@ -14,6 +14,14 @@ SELECT count(*) FROM t; 0 (1 row) +select + version_major, version_minor, reserved_stripe_id, reserved_row_number, reserved_offset + from columnar_test_helpers.columnar_storage_info('t'); + version_major | version_minor | reserved_stripe_id | reserved_row_number | reserved_offset +--------------------------------------------------------------------- + 2 | 0 | 1 | 1 | 16336 +(1 row) + -- check stripe metadata also have been rolled-back SELECT count(*) FROM t_stripes; count @@ -46,6 +54,14 @@ SELECT count(*) FROM t; -- force flush SAVEPOINT s1; INSERT INTO t SELECT i, i+1 FROM generate_series(1, 10) i; +select + version_major, version_minor, reserved_stripe_id, reserved_row_number, reserved_offset + from columnar_test_helpers.columnar_storage_info('t'); + version_major | version_minor | reserved_stripe_id | reserved_row_number | reserved_offset +--------------------------------------------------------------------- + 2 | 0 | 3 | 1 | 24606 +(1 row) + SELECT count(*) FROM t; count --------------------------------------------------------------------- @@ -68,6 +84,14 @@ SELECT count(*) FROM t; INSERT INTO t SELECT i, i+1 FROM generate_series(1, 10) i; COMMIT; +select + version_major, version_minor, reserved_stripe_id, reserved_row_number, reserved_offset + from columnar_test_helpers.columnar_storage_info('t'); + version_major | version_minor | reserved_stripe_id | reserved_row_number | reserved_offset +--------------------------------------------------------------------- + 2 | 0 | 5 | 1 | 40942 +(1 row) + SELECT count(*) FROM t; count --------------------------------------------------------------------- diff --git a/src/test/regress/expected/columnar_test_helpers.out b/src/test/regress/expected/columnar_test_helpers.out index b28086243..2f7cad23b 100644 --- a/src/test/regress/expected/columnar_test_helpers.out +++ b/src/test/regress/expected/columnar_test_helpers.out @@ -3,6 +3,16 @@ SET search_path TO columnar_test_helpers; CREATE FUNCTION columnar_relation_storageid(relid oid) RETURNS bigint LANGUAGE C STABLE STRICT AS 'citus', $$columnar_relation_storageid$$; +CREATE OR REPLACE FUNCTION columnar_storage_info( + rel regclass, + version_major OUT int4, + version_minor OUT int4, + storage_id OUT int8, + reserved_stripe_id OUT int8, + reserved_row_number OUT int8, + reserved_offset OUT int8) + STRICT + LANGUAGE c AS 'citus', $$columnar_storage_info$$; CREATE FUNCTION compression_type_supported(type text) RETURNS boolean AS $$ BEGIN diff --git a/src/test/regress/expected/columnar_truncate.out b/src/test/regress/expected/columnar_truncate.out index 55a496644..ec1f11b4f 100644 --- a/src/test/regress/expected/columnar_truncate.out +++ b/src/test/regress/expected/columnar_truncate.out @@ -43,7 +43,23 @@ SELECT * FROM columnar_test_helpers.chunk_group_consistency; t (1 row) +select + version_major, version_minor, reserved_stripe_id, reserved_row_number, reserved_offset + from columnar_test_helpers.columnar_storage_info('columnar_truncate_test'); + version_major | version_minor | reserved_stripe_id | reserved_row_number | reserved_offset +--------------------------------------------------------------------- + 2 | 0 | 2 | 1 | 16438 +(1 row) + TRUNCATE TABLE columnar_truncate_test; +select + version_major, version_minor, reserved_stripe_id, reserved_row_number, reserved_offset + from columnar_test_helpers.columnar_storage_info('columnar_truncate_test'); + version_major | version_minor | reserved_stripe_id | reserved_row_number | reserved_offset +--------------------------------------------------------------------- + 2 | 0 | 1 | 1 | 16336 +(1 row) + SELECT * FROM columnar_test_helpers.chunk_group_consistency; consistent --------------------------------------------------------------------- @@ -77,7 +93,7 @@ SELECT count(*) FROM columnar_truncate_test_compressed; SELECT pg_relation_size('columnar_truncate_test_compressed'); pg_relation_size --------------------------------------------------------------------- - 0 + 16384 (1 row) INSERT INTO columnar_truncate_test select a, a from generate_series(1, 10) a; diff --git a/src/test/regress/expected/columnar_vacuum.out b/src/test/regress/expected/columnar_vacuum.out index 949220632..5f6402240 100644 --- a/src/test/regress/expected/columnar_vacuum.out +++ b/src/test/regress/expected/columnar_vacuum.out @@ -25,6 +25,14 @@ SELECT count(*) FROM t_stripes; 3 (1 row) +select + version_major, version_minor, reserved_stripe_id, reserved_row_number, reserved_offset + from columnar_test_helpers.columnar_storage_info('t'); + version_major | version_minor | reserved_stripe_id | reserved_row_number | reserved_offset +--------------------------------------------------------------------- + 2 | 0 | 4 | 1 | 32756 +(1 row) + -- vacuum full should merge stripes together VACUUM FULL t; SELECT * FROM columnar_test_helpers.chunk_group_consistency; @@ -45,6 +53,14 @@ SELECT count(*) FROM t_stripes; 1 (1 row) +select + version_major, version_minor, reserved_stripe_id, reserved_row_number, reserved_offset + from columnar_test_helpers.columnar_storage_info('t'); + version_major | version_minor | reserved_stripe_id | reserved_row_number | reserved_offset +--------------------------------------------------------------------- + 2 | 0 | 2 | 1 | 16584 +(1 row) + -- test the case when all data cannot fit into a single stripe SELECT alter_columnar_table_set('t', stripe_row_limit => 1000); alter_columnar_table_set @@ -66,6 +82,14 @@ SELECT count(*) FROM t_stripes; (1 row) VACUUM FULL t; +select + version_major, version_minor, reserved_stripe_id, reserved_row_number, reserved_offset + from columnar_test_helpers.columnar_storage_info('t'); + version_major | version_minor | reserved_stripe_id | reserved_row_number | reserved_offset +--------------------------------------------------------------------- + 2 | 0 | 4 | 1 | 53382 +(1 row) + SELECT * FROM columnar_test_helpers.chunk_group_consistency; consistent --------------------------------------------------------------------- @@ -215,6 +239,14 @@ compression rate: 1.25x total row count: 5530, stripe count: 5, average rows per stripe: 1106 chunk count: 7, containing data for dropped columns: 0, none compressed: 5, pglz compressed: 2 +select + version_major, version_minor, reserved_stripe_id, reserved_row_number, reserved_offset + from columnar_test_helpers.columnar_storage_info('t'); + version_major | version_minor | reserved_stripe_id | reserved_row_number | reserved_offset +--------------------------------------------------------------------- + 2 | 0 | 16 | 1 | 50686 +(1 row) + SELECT * FROM columnar_test_helpers.chunk_group_consistency; consistent --------------------------------------------------------------------- diff --git a/src/test/regress/expected/multi_extension.out b/src/test/regress/expected/multi_extension.out index b22e6d5ae..5fa875416 100644 --- a/src/test/regress/expected/multi_extension.out +++ b/src/test/regress/expected/multi_extension.out @@ -564,12 +564,14 @@ SELECT * FROM print_extension_changes(); --------------------------------------------------------------------- function citus_internal.columnar_ensure_objects_exist() | function create_distributed_table(regclass,text,citus.distribution_type,text) | + | function citus_internal.downgrade_columnar_storage(regclass) + | function citus_internal.upgrade_columnar_storage(regclass) | function citus_local_disk_space_stats() | function create_distributed_table(regclass,text,citus.distribution_type,text,integer) | function worker_partitioned_relation_size(regclass) | function worker_partitioned_relation_total_size(regclass) | function worker_partitioned_table_size(regclass) -(7 rows) +(9 rows) DROP TABLE prev_objects, extension_diff; -- show running version diff --git a/src/test/regress/expected/upgrade_columnar_metapage_after.out b/src/test/regress/expected/upgrade_columnar_metapage_after.out new file mode 100644 index 000000000..beafe92c0 --- /dev/null +++ b/src/test/regress/expected/upgrade_columnar_metapage_after.out @@ -0,0 +1,68 @@ +\set upgrade_test_old_citus_version `echo "$upgrade_test_old_citus_version"` +SELECT substring(:'upgrade_test_old_citus_version', 'v(\d+)\.\d+\.\d+')::int >= 10 AND + substring(:'upgrade_test_old_citus_version', 'v\d+\.(\d+)\.\d+')::int >= 0 +AS upgrade_test_old_citus_version_ge_10_0; + upgrade_test_old_citus_version_ge_10_0 +--------------------------------------------------------------------- + t +(1 row) + +\gset +\if :upgrade_test_old_citus_version_ge_10_0 +\else +\q +\endif +-- it's not the best practice to define this here, but we don't want to include +-- columnar_test_helpers in upgrade test schedule +CREATE OR REPLACE FUNCTION columnar_storage_info( + rel regclass, + version_major OUT int4, + version_minor OUT int4, + storage_id OUT int8, + reserved_stripe_id OUT int8, + reserved_row_number OUT int8, + reserved_offset OUT int8) +STRICT +LANGUAGE c AS 'citus', 'columnar_storage_info'; +SET search_path TO upgrade_columnar_metapage, public; +-- should work since we upgrade metapages when upgrading schema version +INSERT INTO columnar_table_1 VALUES (3); +-- show that all columnar relation's metapage's are upgraded to "2.0" +SELECT count(*)=0 +FROM (SELECT (columnar_storage_info(c.oid)).* t + FROM pg_class c, pg_am a + WHERE c.relam = a.oid AND amname = 'columnar') t +WHERE t.version_major != 2 and t.version_minor != 0; + ?column? +--------------------------------------------------------------------- + t +(1 row) + +-- print metapage for two of the tables +SELECT columnar_storage_info('columnar_table_1'); + columnar_storage_info +--------------------------------------------------------------------- + (2,0,10000000000,4,0,481936) +(1 row) + +SELECT columnar_storage_info('columnar_table_2'); + columnar_storage_info +--------------------------------------------------------------------- + (2,0,10000000001,2,0,16350) +(1 row) + +-- table is already upgraded, make sure that upgrade_columnar_metapage is no-op +SELECT citus_internal.upgrade_columnar_storage(c.oid) +FROM pg_class c, pg_am a +WHERE c.relam = a.oid AND amname = 'columnar' and relname = 'columnar_table_2'; + upgrade_columnar_storage +--------------------------------------------------------------------- + +(1 row) + +SELECT columnar_storage_info('columnar_table_2'); + columnar_storage_info +--------------------------------------------------------------------- + (2,0,10000000001,2,0,16350) +(1 row) + diff --git a/src/test/regress/expected/upgrade_columnar_metapage_after_0.out b/src/test/regress/expected/upgrade_columnar_metapage_after_0.out new file mode 100644 index 000000000..3d2de4490 --- /dev/null +++ b/src/test/regress/expected/upgrade_columnar_metapage_after_0.out @@ -0,0 +1,13 @@ +\set upgrade_test_old_citus_version `echo "$upgrade_test_old_citus_version"` +SELECT substring(:'upgrade_test_old_citus_version', 'v(\d+)\.\d+\.\d+')::int >= 10 AND + substring(:'upgrade_test_old_citus_version', 'v\d+\.(\d+)\.\d+')::int >= 0 +AS upgrade_test_old_citus_version_ge_10_0; + upgrade_test_old_citus_version_ge_10_0 +--------------------------------------------------------------------- + f +(1 row) + +\gset +\if :upgrade_test_old_citus_version_ge_10_0 +\else +\q diff --git a/src/test/regress/expected/upgrade_columnar_metapage_before.out b/src/test/regress/expected/upgrade_columnar_metapage_before.out new file mode 100644 index 000000000..debbe2ef2 --- /dev/null +++ b/src/test/regress/expected/upgrade_columnar_metapage_before.out @@ -0,0 +1,23 @@ +\set upgrade_test_old_citus_version `echo "$upgrade_test_old_citus_version"` +SELECT substring(:'upgrade_test_old_citus_version', 'v(\d+)\.\d+\.\d+')::int >= 10 AND + substring(:'upgrade_test_old_citus_version', 'v\d+\.(\d+)\.\d+')::int >= 0 +AS upgrade_test_old_citus_version_ge_10_0; + upgrade_test_old_citus_version_ge_10_0 +--------------------------------------------------------------------- + t +(1 row) + +\gset +\if :upgrade_test_old_citus_version_ge_10_0 +\else +\q +\endif +CREATE SCHEMA upgrade_columnar_metapage; +SET search_path TO upgrade_columnar_metapage, public; +CREATE TABLE columnar_table_1(a INT, b INT) USING columnar; +INSERT INTO columnar_table_1 SELECT i FROM generate_series(160001, 320000) i; +CREATE TABLE columnar_table_2(b INT) USING columnar; +INSERT INTO columnar_table_2 VALUES (160); +CREATE TABLE columnar_table_3(b INT) USING columnar; +INSERT INTO columnar_table_3 VALUES (1), (2); +CREATE TABLE no_data_columnar_table(a INT, b INT, c TEXT) USING columnar; diff --git a/src/test/regress/expected/upgrade_columnar_metapage_before_0.out b/src/test/regress/expected/upgrade_columnar_metapage_before_0.out new file mode 100644 index 000000000..3d2de4490 --- /dev/null +++ b/src/test/regress/expected/upgrade_columnar_metapage_before_0.out @@ -0,0 +1,13 @@ +\set upgrade_test_old_citus_version `echo "$upgrade_test_old_citus_version"` +SELECT substring(:'upgrade_test_old_citus_version', 'v(\d+)\.\d+\.\d+')::int >= 10 AND + substring(:'upgrade_test_old_citus_version', 'v\d+\.(\d+)\.\d+')::int >= 0 +AS upgrade_test_old_citus_version_ge_10_0; + upgrade_test_old_citus_version_ge_10_0 +--------------------------------------------------------------------- + f +(1 row) + +\gset +\if :upgrade_test_old_citus_version_ge_10_0 +\else +\q diff --git a/src/test/regress/expected/upgrade_list_citus_objects.out b/src/test/regress/expected/upgrade_list_citus_objects.out index 72c5a79a2..06065b049 100644 --- a/src/test/regress/expected/upgrade_list_citus_objects.out +++ b/src/test/regress/expected/upgrade_list_citus_objects.out @@ -56,6 +56,7 @@ ORDER BY 1; function citus_extradata_container(internal) function citus_finish_pg_upgrade() function citus_get_active_worker_nodes() + function citus_internal.downgrade_columnar_storage(regclass) function citus_internal.find_groupid_for_node(text,integer) function citus_internal.pg_dist_node_trigger_func() function citus_internal.pg_dist_rebalance_strategy_enterprise_check() @@ -64,6 +65,7 @@ ORDER BY 1; function citus_internal.refresh_isolation_tester_prepared_statement() function citus_internal.replace_isolation_tester_func() function citus_internal.restore_isolation_tester_func() + function citus_internal.upgrade_columnar_storage(regclass) function citus_isolation_test_session_is_blocked(integer,integer[]) function citus_json_concatenate(json,json) function citus_json_concatenate_final(json) @@ -246,5 +248,5 @@ ORDER BY 1; view citus_worker_stat_activity view pg_dist_shard_placement view time_partitions -(230 rows) +(232 rows) diff --git a/src/test/regress/expected/upgrade_list_citus_objects_0.out b/src/test/regress/expected/upgrade_list_citus_objects_0.out index 5b5c04fb3..9752c6502 100644 --- a/src/test/regress/expected/upgrade_list_citus_objects_0.out +++ b/src/test/regress/expected/upgrade_list_citus_objects_0.out @@ -53,6 +53,7 @@ ORDER BY 1; function citus_extradata_container(internal) function citus_finish_pg_upgrade() function citus_get_active_worker_nodes() + function citus_internal.downgrade_columnar_storage(regclass) function citus_internal.find_groupid_for_node(text,integer) function citus_internal.pg_dist_node_trigger_func() function citus_internal.pg_dist_rebalance_strategy_enterprise_check() @@ -61,6 +62,7 @@ ORDER BY 1; function citus_internal.refresh_isolation_tester_prepared_statement() function citus_internal.replace_isolation_tester_func() function citus_internal.restore_isolation_tester_func() + function citus_internal.upgrade_columnar_storage(regclass) function citus_isolation_test_session_is_blocked(integer,integer[]) function citus_json_concatenate(json,json) function citus_json_concatenate_final(json) @@ -238,5 +240,5 @@ ORDER BY 1; view citus_worker_stat_activity view pg_dist_shard_placement view time_partitions -(222 rows) +(224 rows) diff --git a/src/test/regress/input/columnar_load.source b/src/test/regress/input/columnar_load.source index b4b72926c..e00107d83 100644 --- a/src/test/regress/input/columnar_load.source +++ b/src/test/regress/input/columnar_load.source @@ -15,14 +15,20 @@ COPY contestant FROM '@abs_srcdir@/data/contestants.1.csv' WITH CSV; -- COPY into uncompressed table from program COPY contestant FROM PROGRAM 'cat @abs_srcdir@/data/contestants.2.csv' WITH CSV; +select + version_major, version_minor, reserved_stripe_id, reserved_row_number, reserved_offset + from columnar_test_helpers.columnar_storage_info('contestant'); + -- COPY into compressed table -set columnar.compression = 'pglz'; COPY contestant_compressed FROM '@abs_srcdir@/data/contestants.1.csv' WITH CSV; -- COPY into uncompressed table from program COPY contestant_compressed FROM PROGRAM 'cat @abs_srcdir@/data/contestants.2.csv' WITH CSV; -set columnar.compression to default; + +select + version_major, version_minor, reserved_stripe_id, reserved_row_number, reserved_offset + from columnar_test_helpers.columnar_storage_info('contestant_compressed'); -- Test column list CREATE TABLE famous_constants (id int, name text, value real) diff --git a/src/test/regress/output/columnar_load.source b/src/test/regress/output/columnar_load.source index c58f0bfb0..50cbc05fd 100644 --- a/src/test/regress/output/columnar_load.source +++ b/src/test/regress/output/columnar_load.source @@ -14,13 +14,27 @@ DETAIL: command not found COPY contestant FROM '@abs_srcdir@/data/contestants.1.csv' WITH CSV; -- COPY into uncompressed table from program COPY contestant FROM PROGRAM 'cat @abs_srcdir@/data/contestants.2.csv' WITH CSV; +select + version_major, version_minor, reserved_stripe_id, reserved_row_number, reserved_offset + from columnar_test_helpers.columnar_storage_info('contestant'); + version_major | version_minor | reserved_stripe_id | reserved_row_number | reserved_offset +--------------------------------------------------------------------- + 2 | 0 | 3 | 1 | 24742 +(1 row) + -- COPY into compressed table -set columnar.compression = 'pglz'; COPY contestant_compressed FROM '@abs_srcdir@/data/contestants.1.csv' WITH CSV; -- COPY into uncompressed table from program COPY contestant_compressed FROM PROGRAM 'cat @abs_srcdir@/data/contestants.2.csv' WITH CSV; -set columnar.compression to default; +select + version_major, version_minor, reserved_stripe_id, reserved_row_number, reserved_offset + from columnar_test_helpers.columnar_storage_info('contestant_compressed'); + version_major | version_minor | reserved_stripe_id | reserved_row_number | reserved_offset +--------------------------------------------------------------------- + 2 | 0 | 3 | 1 | 24704 +(1 row) + -- Test column list CREATE TABLE famous_constants (id int, name text, value real) USING columnar; diff --git a/src/test/regress/sql/columnar_alter.sql b/src/test/regress/sql/columnar_alter.sql index b9b4498cb..ed916967e 100644 --- a/src/test/regress/sql/columnar_alter.sql +++ b/src/test/regress/sql/columnar_alter.sql @@ -17,6 +17,10 @@ INSERT INTO test_alter_table SELECT * FROM sample_data; -- drop a column ALTER TABLE test_alter_table DROP COLUMN a; +select + version_major, version_minor, reserved_stripe_id, reserved_row_number, reserved_offset + from columnar_test_helpers.columnar_storage_info('test_alter_table'); + -- test analyze ANALYZE test_alter_table; @@ -36,6 +40,10 @@ SELECT * FROM test_alter_table; INSERT INTO test_alter_table (SELECT 3, 5, 8); SELECT * FROM test_alter_table; +select + version_major, version_minor, reserved_stripe_id, reserved_row_number, reserved_offset + from columnar_test_helpers.columnar_storage_info('test_alter_table'); + -- add a fixed-length column with default value ALTER TABLE test_alter_table ADD COLUMN e int default 3; @@ -43,6 +51,10 @@ SELECT * from test_alter_table; INSERT INTO test_alter_table (SELECT 1, 2, 4, 8); SELECT * from test_alter_table; +select + version_major, version_minor, reserved_stripe_id, reserved_row_number, reserved_offset + from columnar_test_helpers.columnar_storage_info('test_alter_table'); + -- add a variable-length column with default value ALTER TABLE test_alter_table ADD COLUMN f text DEFAULT 'TEXT ME'; diff --git a/src/test/regress/sql/columnar_create.sql b/src/test/regress/sql/columnar_create.sql index 41ffc1053..0c18f2212 100644 --- a/src/test/regress/sql/columnar_create.sql +++ b/src/test/regress/sql/columnar_create.sql @@ -7,12 +7,12 @@ CREATE TABLE contestant (handle TEXT, birthdate DATE, rating INT, percentile FLOAT, country CHAR(3), achievements TEXT[]) USING columnar; +SELECT alter_columnar_table_set('contestant', compression => 'none'); -- should fail CREATE INDEX contestant_idx on contestant(handle); --- Create compressed table with automatically determined file path --- COMPRESSED +-- Create zstd compressed table CREATE TABLE contestant_compressed (handle TEXT, birthdate DATE, rating INT, percentile FLOAT, country CHAR(3), achievements TEXT[]) USING columnar; diff --git a/src/test/regress/sql/columnar_empty.sql b/src/test/regress/sql/columnar_empty.sql index 9ed2c1fc1..a733a8540 100644 --- a/src/test/regress/sql/columnar_empty.sql +++ b/src/test/regress/sql/columnar_empty.sql @@ -19,6 +19,14 @@ select count(*) from t_uncompressed; select * from t_compressed; select count(*) from t_compressed; +-- check storage +select + version_major, version_minor, reserved_stripe_id, reserved_row_number, reserved_offset + from columnar_test_helpers.columnar_storage_info('t_compressed'); +select + version_major, version_minor, reserved_stripe_id, reserved_row_number, reserved_offset + from columnar_test_helpers.columnar_storage_info('t_uncompressed'); + -- explain explain (costs off, summary off, timing off) select * from t_uncompressed; explain (costs off, summary off, timing off) select * from t_compressed; @@ -31,6 +39,14 @@ vacuum verbose t_uncompressed; vacuum full t_compressed; vacuum full t_uncompressed; +-- check storage +select + version_major, version_minor, reserved_stripe_id, reserved_row_number, reserved_offset + from columnar_test_helpers.columnar_storage_info('t_compressed'); +select + version_major, version_minor, reserved_stripe_id, reserved_row_number, reserved_offset + from columnar_test_helpers.columnar_storage_info('t_uncompressed'); + -- analyze analyze t_uncompressed; analyze t_compressed; @@ -43,6 +59,14 @@ truncate t_compressed; alter table t_uncompressed alter column a type text; alter table t_compressed alter column a type text; +-- check storage +select + version_major, version_minor, reserved_stripe_id, reserved_row_number, reserved_offset + from columnar_test_helpers.columnar_storage_info('t_compressed'); +select + version_major, version_minor, reserved_stripe_id, reserved_row_number, reserved_offset + from columnar_test_helpers.columnar_storage_info('t_uncompressed'); + -- verify cost of scanning an empty table is zero, not NaN explain table t_uncompressed; explain table t_compressed; diff --git a/src/test/regress/sql/columnar_insert.sql b/src/test/regress/sql/columnar_insert.sql index c5ffe73b2..4bc850f11 100644 --- a/src/test/regress/sql/columnar_insert.sql +++ b/src/test/regress/sql/columnar_insert.sql @@ -22,6 +22,10 @@ select count(*) from test_insert_command_data; insert into test_insert_command select * from test_insert_command_data; select count(*) from test_insert_command; +select + version_major, version_minor, reserved_stripe_id, reserved_row_number, reserved_offset + from columnar_test_helpers.columnar_storage_info('test_insert_command'); + SELECT * FROM columnar_test_helpers.chunk_group_consistency; drop table test_insert_command_data; @@ -99,6 +103,10 @@ SELECT pg_column_size(external), pg_column_size(extended) FROM test_toast_columnar; +select + version_major, version_minor, reserved_stripe_id, reserved_row_number, reserved_offset + from columnar_test_helpers.columnar_storage_info('test_toast_columnar'); + SELECT * FROM columnar_test_helpers.chunk_group_consistency; DROP TABLE test_toast_row; @@ -128,6 +136,10 @@ INSERT INTO zero_col_heap SELECT * FROM zero_col_heap; INSERT INTO zero_col SELECT * FROM zero_col_heap; +select + version_major, version_minor, reserved_stripe_id, reserved_row_number, reserved_offset + from columnar_test_helpers.columnar_storage_info('zero_col'); + SELECT relname, stripe_num, chunk_group_count, row_count FROM columnar.stripe a, pg_class b WHERE columnar_test_helpers.columnar_relation_storageid(b.oid)=a.storage_id AND relname = 'zero_col' ORDER BY 1,2,3,4; diff --git a/src/test/regress/sql/columnar_rollback.sql b/src/test/regress/sql/columnar_rollback.sql index 3022ab906..d20c68a3a 100644 --- a/src/test/regress/sql/columnar_rollback.sql +++ b/src/test/regress/sql/columnar_rollback.sql @@ -13,6 +13,10 @@ INSERT INTO t SELECT i, i+1 FROM generate_series(1, 10) i; ROLLBACK; SELECT count(*) FROM t; +select + version_major, version_minor, reserved_stripe_id, reserved_row_number, reserved_offset + from columnar_test_helpers.columnar_storage_info('t'); + -- check stripe metadata also have been rolled-back SELECT count(*) FROM t_stripes; @@ -28,6 +32,11 @@ INSERT INTO t SELECT i, i+1 FROM generate_series(1, 10) i; SELECT count(*) FROM t; -- force flush SAVEPOINT s1; INSERT INTO t SELECT i, i+1 FROM generate_series(1, 10) i; + +select + version_major, version_minor, reserved_stripe_id, reserved_row_number, reserved_offset + from columnar_test_helpers.columnar_storage_info('t'); + SELECT count(*) FROM t; ROLLBACK TO SAVEPOINT s1; SELECT count(*) FROM t; @@ -36,6 +45,10 @@ SELECT count(*) FROM t; INSERT INTO t SELECT i, i+1 FROM generate_series(1, 10) i; COMMIT; +select + version_major, version_minor, reserved_stripe_id, reserved_row_number, reserved_offset + from columnar_test_helpers.columnar_storage_info('t'); + SELECT count(*) FROM t; SELECT count(*) FROM t_stripes; diff --git a/src/test/regress/sql/columnar_test_helpers.sql b/src/test/regress/sql/columnar_test_helpers.sql index d240a98d8..08438a75f 100644 --- a/src/test/regress/sql/columnar_test_helpers.sql +++ b/src/test/regress/sql/columnar_test_helpers.sql @@ -5,6 +5,17 @@ CREATE FUNCTION columnar_relation_storageid(relid oid) RETURNS bigint LANGUAGE C STABLE STRICT AS 'citus', $$columnar_relation_storageid$$; +CREATE OR REPLACE FUNCTION columnar_storage_info( + rel regclass, + version_major OUT int4, + version_minor OUT int4, + storage_id OUT int8, + reserved_stripe_id OUT int8, + reserved_row_number OUT int8, + reserved_offset OUT int8) + STRICT + LANGUAGE c AS 'citus', $$columnar_storage_info$$; + CREATE FUNCTION compression_type_supported(type text) RETURNS boolean AS $$ BEGIN diff --git a/src/test/regress/sql/columnar_truncate.sql b/src/test/regress/sql/columnar_truncate.sql index 701b41ceb..bdbb00c23 100644 --- a/src/test/regress/sql/columnar_truncate.sql +++ b/src/test/regress/sql/columnar_truncate.sql @@ -27,8 +27,16 @@ SELECT * FROM columnar_truncate_test; SELECT * FROM columnar_test_helpers.chunk_group_consistency; +select + version_major, version_minor, reserved_stripe_id, reserved_row_number, reserved_offset + from columnar_test_helpers.columnar_storage_info('columnar_truncate_test'); + TRUNCATE TABLE columnar_truncate_test; +select + version_major, version_minor, reserved_stripe_id, reserved_row_number, reserved_offset + from columnar_test_helpers.columnar_storage_info('columnar_truncate_test'); + SELECT * FROM columnar_test_helpers.chunk_group_consistency; SELECT * FROM columnar_truncate_test; diff --git a/src/test/regress/sql/columnar_vacuum.sql b/src/test/regress/sql/columnar_vacuum.sql index 29babd84e..a2fe24feb 100644 --- a/src/test/regress/sql/columnar_vacuum.sql +++ b/src/test/regress/sql/columnar_vacuum.sql @@ -17,6 +17,10 @@ INSERT INTO t SELECT i, i * i FROM generate_series(21, 30) i; SELECT sum(a), sum(b) FROM t; SELECT count(*) FROM t_stripes; +select + version_major, version_minor, reserved_stripe_id, reserved_row_number, reserved_offset + from columnar_test_helpers.columnar_storage_info('t'); + -- vacuum full should merge stripes together VACUUM FULL t; @@ -25,6 +29,10 @@ SELECT * FROM columnar_test_helpers.chunk_group_consistency; SELECT sum(a), sum(b) FROM t; SELECT count(*) FROM t_stripes; +select + version_major, version_minor, reserved_stripe_id, reserved_row_number, reserved_offset + from columnar_test_helpers.columnar_storage_info('t'); + -- test the case when all data cannot fit into a single stripe SELECT alter_columnar_table_set('t', stripe_row_limit => 1000); INSERT INTO t SELECT i, 2 * i FROM generate_series(1,2500) i; @@ -34,6 +42,10 @@ SELECT count(*) FROM t_stripes; VACUUM FULL t; +select + version_major, version_minor, reserved_stripe_id, reserved_row_number, reserved_offset + from columnar_test_helpers.columnar_storage_info('t'); + SELECT * FROM columnar_test_helpers.chunk_group_consistency; SELECT sum(a), sum(b) FROM t; @@ -95,6 +107,9 @@ INSERT INTO t SELECT i / 5 FROM generate_series(1, 1500) i; COMMIT; VACUUM VERBOSE t; +select + version_major, version_minor, reserved_stripe_id, reserved_row_number, reserved_offset + from columnar_test_helpers.columnar_storage_info('t'); SELECT * FROM columnar_test_helpers.chunk_group_consistency; diff --git a/src/test/regress/sql/upgrade_columnar_metapage_after.sql b/src/test/regress/sql/upgrade_columnar_metapage_after.sql new file mode 100644 index 000000000..e04dbafcd --- /dev/null +++ b/src/test/regress/sql/upgrade_columnar_metapage_after.sql @@ -0,0 +1,45 @@ +\set upgrade_test_old_citus_version `echo "$upgrade_test_old_citus_version"` +SELECT substring(:'upgrade_test_old_citus_version', 'v(\d+)\.\d+\.\d+')::int >= 10 AND + substring(:'upgrade_test_old_citus_version', 'v\d+\.(\d+)\.\d+')::int >= 0 +AS upgrade_test_old_citus_version_ge_10_0; +\gset +\if :upgrade_test_old_citus_version_ge_10_0 +\else +\q +\endif + +-- it's not the best practice to define this here, but we don't want to include +-- columnar_test_helpers in upgrade test schedule +CREATE OR REPLACE FUNCTION columnar_storage_info( + rel regclass, + version_major OUT int4, + version_minor OUT int4, + storage_id OUT int8, + reserved_stripe_id OUT int8, + reserved_row_number OUT int8, + reserved_offset OUT int8) +STRICT +LANGUAGE c AS 'citus', 'columnar_storage_info'; + +SET search_path TO upgrade_columnar_metapage, public; + +-- should work since we upgrade metapages when upgrading schema version +INSERT INTO columnar_table_1 VALUES (3); + +-- show that all columnar relation's metapage's are upgraded to "2.0" +SELECT count(*)=0 +FROM (SELECT (columnar_storage_info(c.oid)).* t + FROM pg_class c, pg_am a + WHERE c.relam = a.oid AND amname = 'columnar') t +WHERE t.version_major != 2 and t.version_minor != 0; + +-- print metapage for two of the tables +SELECT columnar_storage_info('columnar_table_1'); +SELECT columnar_storage_info('columnar_table_2'); + +-- table is already upgraded, make sure that upgrade_columnar_metapage is no-op +SELECT citus_internal.upgrade_columnar_storage(c.oid) +FROM pg_class c, pg_am a +WHERE c.relam = a.oid AND amname = 'columnar' and relname = 'columnar_table_2'; + +SELECT columnar_storage_info('columnar_table_2'); diff --git a/src/test/regress/sql/upgrade_columnar_metapage_before.sql b/src/test/regress/sql/upgrade_columnar_metapage_before.sql new file mode 100644 index 000000000..b3208c8c2 --- /dev/null +++ b/src/test/regress/sql/upgrade_columnar_metapage_before.sql @@ -0,0 +1,23 @@ +\set upgrade_test_old_citus_version `echo "$upgrade_test_old_citus_version"` +SELECT substring(:'upgrade_test_old_citus_version', 'v(\d+)\.\d+\.\d+')::int >= 10 AND + substring(:'upgrade_test_old_citus_version', 'v\d+\.(\d+)\.\d+')::int >= 0 +AS upgrade_test_old_citus_version_ge_10_0; +\gset +\if :upgrade_test_old_citus_version_ge_10_0 +\else +\q +\endif + +CREATE SCHEMA upgrade_columnar_metapage; +SET search_path TO upgrade_columnar_metapage, public; + +CREATE TABLE columnar_table_1(a INT, b INT) USING columnar; +INSERT INTO columnar_table_1 SELECT i FROM generate_series(160001, 320000) i; + +CREATE TABLE columnar_table_2(b INT) USING columnar; +INSERT INTO columnar_table_2 VALUES (160); + +CREATE TABLE columnar_table_3(b INT) USING columnar; +INSERT INTO columnar_table_3 VALUES (1), (2); + +CREATE TABLE no_data_columnar_table(a INT, b INT, c TEXT) USING columnar;