mirror of https://github.com/citusdata/citus.git
Use RelFileLocator in PG 16 AND vacuum together
parent
3660b8fb0e
commit
158b003366
|
@ -126,8 +126,6 @@ static void ColumnarProcessUtility(PlannedStmt *pstmt,
|
|||
static bool ConditionalLockRelationWithTimeout(Relation rel, LOCKMODE lockMode,
|
||||
int timeout, int retryInterval);
|
||||
static List * NeededColumnsList(TupleDesc tupdesc, Bitmapset *attr_needed);
|
||||
static void LogRelationStats(Relation rel, int elevel);
|
||||
static void TruncateColumnar(Relation rel, int elevel);
|
||||
static HeapTuple ColumnarSlotCopyHeapTuple(TupleTableSlot *slot);
|
||||
static void ColumnarCheckLogicalReplication(Relation rel);
|
||||
static Datum * detoast_values(TupleDesc tupleDesc, Datum *orig_values, bool *isnull);
|
||||
|
@ -1079,196 +1077,6 @@ For now, comment out so that we can compile
|
|||
}
|
||||
|
||||
|
||||
/*
|
||||
* LogRelationStats logs statistics as the output of the VACUUM VERBOSE.
|
||||
*/
|
||||
static void
|
||||
LogRelationStats(Relation rel, int elevel)
|
||||
{
|
||||
ListCell *stripeMetadataCell = NULL;
|
||||
RelFileLocator relfilelocator = rel->rd_locator;
|
||||
StringInfo infoBuf = makeStringInfo();
|
||||
|
||||
int compressionStats[COMPRESSION_COUNT] = { 0 };
|
||||
uint64 totalStripeLength = 0;
|
||||
uint64 tupleCount = 0;
|
||||
uint64 chunkCount = 0;
|
||||
TupleDesc tupdesc = RelationGetDescr(rel);
|
||||
uint64 droppedChunksWithData = 0;
|
||||
uint64 totalDecompressedLength = 0;
|
||||
|
||||
List *stripeList = StripesForRelfilenode(relfilelocator);
|
||||
int stripeCount = list_length(stripeList);
|
||||
|
||||
foreach(stripeMetadataCell, stripeList)
|
||||
{
|
||||
StripeMetadata *stripe = lfirst(stripeMetadataCell);
|
||||
StripeSkipList *skiplist = ReadStripeSkipList(relfilelocator, stripe->id,
|
||||
RelationGetDescr(rel),
|
||||
stripe->chunkCount,
|
||||
GetTransactionSnapshot());
|
||||
for (uint32 column = 0; column < skiplist->columnCount; column++)
|
||||
{
|
||||
bool attrDropped = tupdesc->attrs[column].attisdropped;
|
||||
for (uint32 chunk = 0; chunk < skiplist->chunkCount; chunk++)
|
||||
{
|
||||
ColumnChunkSkipNode *skipnode =
|
||||
&skiplist->chunkSkipNodeArray[column][chunk];
|
||||
|
||||
/* ignore zero length chunks for dropped attributes */
|
||||
if (skipnode->valueLength > 0)
|
||||
{
|
||||
compressionStats[skipnode->valueCompressionType]++;
|
||||
chunkCount++;
|
||||
|
||||
if (attrDropped)
|
||||
{
|
||||
droppedChunksWithData++;
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* We don't compress exists buffer, so its compressed & decompressed
|
||||
* lengths are the same.
|
||||
*/
|
||||
totalDecompressedLength += skipnode->existsLength;
|
||||
totalDecompressedLength += skipnode->decompressedValueSize;
|
||||
}
|
||||
}
|
||||
|
||||
tupleCount += stripe->rowCount;
|
||||
totalStripeLength += stripe->dataLength;
|
||||
}
|
||||
|
||||
uint64 relPages = smgrnblocks(RelationGetSmgr(rel), MAIN_FORKNUM);
|
||||
RelationCloseSmgr(rel);
|
||||
|
||||
Datum storageId = DirectFunctionCall1(columnar_relation_storageid,
|
||||
ObjectIdGetDatum(RelationGetRelid(rel)));
|
||||
|
||||
double compressionRate = totalStripeLength ?
|
||||
(double) totalDecompressedLength / totalStripeLength :
|
||||
1.0;
|
||||
|
||||
appendStringInfo(infoBuf, "storage id: %ld\n", DatumGetInt64(storageId));
|
||||
appendStringInfo(infoBuf, "total file size: %ld, total data size: %ld\n",
|
||||
relPages * BLCKSZ, totalStripeLength);
|
||||
appendStringInfo(infoBuf, "compression rate: %.2fx\n", compressionRate);
|
||||
appendStringInfo(infoBuf,
|
||||
"total row count: %ld, stripe count: %d, "
|
||||
"average rows per stripe: %ld\n",
|
||||
tupleCount, stripeCount,
|
||||
stripeCount ? tupleCount / stripeCount : 0);
|
||||
appendStringInfo(infoBuf,
|
||||
"chunk count: %ld"
|
||||
", containing data for dropped columns: %ld",
|
||||
chunkCount, droppedChunksWithData);
|
||||
for (int compressionType = 0; compressionType < COMPRESSION_COUNT; compressionType++)
|
||||
{
|
||||
const char *compressionName = CompressionTypeStr(compressionType);
|
||||
|
||||
/* skip if this compression algorithm has not been compiled */
|
||||
if (compressionName == NULL)
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
/* skip if no chunks use this compression type */
|
||||
if (compressionStats[compressionType] == 0)
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
appendStringInfo(infoBuf,
|
||||
", %s compressed: %d",
|
||||
compressionName,
|
||||
compressionStats[compressionType]);
|
||||
}
|
||||
appendStringInfoString(infoBuf, "\n");
|
||||
|
||||
ereport(elevel, (errmsg("statistics for \"%s\":\n%s", RelationGetRelationName(rel),
|
||||
infoBuf->data)));
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* TruncateColumnar truncates the unused space at the end of main fork for
|
||||
* a columnar table. This unused space can be created by aborted transactions.
|
||||
*
|
||||
* This implementation is based on heap_vacuum_rel in vacuumlazy.c with some
|
||||
* changes so it suits columnar store relations.
|
||||
*/
|
||||
static void
|
||||
TruncateColumnar(Relation rel, int elevel)
|
||||
{
|
||||
PGRUsage ru0;
|
||||
|
||||
pg_rusage_init(&ru0);
|
||||
|
||||
/* Report that we are now truncating */
|
||||
pgstat_progress_update_param(PROGRESS_VACUUM_PHASE,
|
||||
PROGRESS_VACUUM_PHASE_TRUNCATE);
|
||||
|
||||
|
||||
/*
|
||||
* We need access exclusive lock on the relation in order to do
|
||||
* truncation. If we can't get it, give up rather than waiting --- we
|
||||
* don't want to block other backends, and we don't want to deadlock
|
||||
* (which is quite possible considering we already hold a lower-grade
|
||||
* lock).
|
||||
*
|
||||
* The decisions for AccessExclusiveLock and conditional lock with
|
||||
* a timeout is based on lazy_truncate_heap in vacuumlazy.c.
|
||||
*/
|
||||
if (!ConditionalLockRelationWithTimeout(rel, AccessExclusiveLock,
|
||||
VACUUM_TRUNCATE_LOCK_TIMEOUT,
|
||||
VACUUM_TRUNCATE_LOCK_WAIT_INTERVAL))
|
||||
{
|
||||
/*
|
||||
* We failed to establish the lock in the specified number of
|
||||
* retries. This means we give up truncating.
|
||||
*/
|
||||
ereport(elevel,
|
||||
(errmsg("\"%s\": stopping truncate due to conflicting lock request",
|
||||
RelationGetRelationName(rel))));
|
||||
return;
|
||||
}
|
||||
|
||||
/*
|
||||
* Due to the AccessExclusive lock there's no danger that
|
||||
* new stripes be added beyond highestPhysicalAddress while
|
||||
* we're truncating.
|
||||
*/
|
||||
uint64 newDataReservation = Max(GetHighestUsedAddress(rel->rd_locator) + 1,
|
||||
ColumnarFirstLogicalOffset);
|
||||
|
||||
BlockNumber old_rel_pages = smgrnblocks(RelationGetSmgr(rel), MAIN_FORKNUM);
|
||||
|
||||
if (!ColumnarStorageTruncate(rel, newDataReservation))
|
||||
{
|
||||
UnlockRelation(rel, AccessExclusiveLock);
|
||||
return;
|
||||
}
|
||||
|
||||
BlockNumber new_rel_pages = smgrnblocks(RelationGetSmgr(rel), MAIN_FORKNUM);
|
||||
|
||||
/*
|
||||
* We can release the exclusive lock as soon as we have truncated.
|
||||
* Other backends can't safely access the relation until they have
|
||||
* processed the smgr invalidation that smgrtruncate sent out ... but
|
||||
* that should happen as part of standard invalidation processing once
|
||||
* they acquire lock on the relation.
|
||||
*/
|
||||
UnlockRelation(rel, AccessExclusiveLock);
|
||||
|
||||
ereport(elevel,
|
||||
(errmsg("\"%s\": truncated %u to %u pages",
|
||||
RelationGetRelationName(rel),
|
||||
old_rel_pages, new_rel_pages),
|
||||
errdetail_internal("%s", pg_rusage_show(&ru0))));
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* ConditionalLockRelationWithTimeout tries to acquire a relation lock until
|
||||
* it either succeeds or timesout. It doesn't enter wait queue and instead it
|
||||
|
|
|
@ -27,8 +27,8 @@
|
|||
#include "utils/guc.h"
|
||||
#include "utils/memutils.h"
|
||||
#include "utils/rel.h"
|
||||
#include "utils/relfilenodemap.h"
|
||||
|
||||
#include "storage/relfilelocator.h"
|
||||
#include "utils/relfilenumbermap.h"
|
||||
#include "columnar/columnar.h"
|
||||
#include "columnar/columnar_storage.h"
|
||||
#include "columnar/columnar_version_compat.h"
|
||||
|
@ -174,7 +174,7 @@ ColumnarWriteRow(ColumnarWriteState *writeState, Datum *columnValues, bool *colu
|
|||
writeState->stripeSkipList = stripeSkipList;
|
||||
writeState->compressionBuffer = makeStringInfo();
|
||||
|
||||
Oid relationId = RelidByRelfilenode(writeState->relfilelocator.spcOid,
|
||||
Oid relationId = RelidByRelfilenumber(writeState->relfilelocator.spcOid,
|
||||
writeState->relfilelocator.relNumber);
|
||||
Relation relation = relation_open(relationId, NoLock);
|
||||
writeState->emptyStripeReservation =
|
||||
|
@ -393,8 +393,8 @@ FlushStripe(ColumnarWriteState *writeState)
|
|||
|
||||
elog(DEBUG1, "Flushing Stripe of size %d", stripeBuffers->rowCount);
|
||||
|
||||
Oid relationId = RelidByRelfilenode(writeState->relfilelocator.spcNode,
|
||||
writeState->relfilelocator.relNode);
|
||||
Oid relationId = RelidByRelfilenumber(writeState->relfilelocator.spcOid,
|
||||
writeState->relfilelocator.relNumber);
|
||||
Relation relation = relation_open(relationId, NoLock);
|
||||
|
||||
/*
|
||||
|
|
|
@ -83,7 +83,6 @@
|
|||
#include "utils/memutils.h"
|
||||
#include "utils/palloc.h"
|
||||
#include "utils/rel.h"
|
||||
#include "utils/relfilenodemap.h"
|
||||
#include "utils/relmapper.h"
|
||||
#include "utils/resowner.h"
|
||||
#include "utils/syscache.h"
|
||||
|
|
Loading…
Reference in New Issue