insert tuples in columnar.visibility for deleted rows

columnar/delete
Nils Dijk 2021-06-18 15:55:27 +02:00
parent f211b3278f
commit d11146ae5b
No known key found for this signature in database
GPG Key ID: CA1177EF9434F241
7 changed files with 166 additions and 8 deletions

View File

@ -84,6 +84,8 @@ static Oid ColumnarChunkRelationId(void);
static Oid ColumnarChunkGroupRelationId(void);
static Oid ColumnarChunkIndexRelationId(void);
static Oid ColumnarChunkGroupIndexRelationId(void);
static Oid ColumnarVisibilityRelationId(void);
static Oid ColumnarVisibilityIndexRelationId(void);
static Oid ColumnarNamespaceId(void);
static uint64 LookupStorageId(RelFileNode relfilenode);
static uint64 GetHighestUsedFirstRowNumber(uint64 storageId);
@ -165,6 +167,11 @@ typedef FormData_columnar_options *Form_columnar_options;
#define Anum_columnar_chunk_value_decompressed_size 13
#define Anum_columnar_chunk_value_count 14
/* constants for columnar.visibility */
#define Natts_columnar_visibility 3
#define Anum_columnar_visibility_storage_id 1
#define Anum_columnar_visibility_stripe_num 2
#define Anum_columnar_visibility_row_num 3
/*
* InitColumnarOptions initialized the columnar table options. Meaning it writes the
@ -627,11 +634,11 @@ ReadStripeSkipList(RelFileNode relfilenode, uint64 stripe, TupleDesc tupleDescri
* stripe_first_row_number_idx. If no such row exists, then returns NULL.
*/
StripeMetadata *
FindStripeByRowNumber(Relation relation, uint64 rowNumber, Snapshot snapshot)
FindStripeByRowNumber(RelFileNode relfilenode, uint64 rowNumber, Snapshot snapshot)
{
StripeMetadata *foundStripeMetadata = NULL;
uint64 storageId = ColumnarStorageGetStorageId(relation, false);
uint64 storageId = LookupStorageId(relfilenode);
ScanKeyData scanKey[2];
ScanKeyInit(&scanKey[0], Anum_columnar_stripe_storageid,
BTEqualStrategyNumber, F_OIDEQ, Int32GetDatum(storageId));
@ -899,6 +906,34 @@ ReserveStripe(Relation rel, uint64 sizeBytes,
}
void
InsertRemovedRowInformation(RelFileNode relfilenode, uint64 stripenum, uint64 rownum)
{
uint64 storageId = LookupStorageId(relfilenode);
bool nulls[Natts_columnar_visibility] = { 0 };
Datum values[Natts_columnar_visibility] = {
UInt64GetDatum(storageId),
Int64GetDatum(stripenum), /* TODO: find the stripe number per row number */
Int64GetDatum(rownum)
};
Oid columnarVisibilityRelationId = ColumnarVisibilityRelationId();
Relation columnarVisibility = table_open(columnarVisibilityRelationId,
RowExclusiveLock);
ModifyState *modifyState = StartModifyRelation(columnarVisibility);
InsertTupleAndEnforceConstraints(modifyState, values, nulls);
FinishModifyRelation(modifyState);
CommandCounterIncrement();
table_close(columnarVisibility, RowExclusiveLock);
}
/*
* ReadDataFileStripeList reads the stripe list for a given storageId
* in the given snapshot.
@ -998,6 +1033,10 @@ DeleteMetadataRows(RelFileNode relfilenode)
Anum_columnar_chunk_storageid,
ColumnarChunkIndexRelationId(),
storageId);
DeleteStorageFromColumnarMetadataTable(ColumnarVisibilityRelationId(),
Anum_columnar_visibility_storage_id,
ColumnarVisibilityIndexRelationId(),
storageId);
}
@ -1318,6 +1357,26 @@ ColumnarChunkGroupIndexRelationId(void)
}
/*
* ColumnarVisibilityRelationId returns relation id of columnar.visibility.
*/
static Oid
ColumnarVisibilityRelationId(void)
{
return get_relname_relid("visibility", ColumnarNamespaceId());
}
/*
* ColumnarVisibilityIndexRelationId returns relation id of columnar.visibility_pkey.
*/
static Oid
ColumnarVisibilityIndexRelationId()
{
return get_relname_relid("visibility_pkey", ColumnarNamespaceId());
}
/*
* ColumnarNamespaceId returns namespace id of the schema we store columnar
* related tables.

View File

@ -262,7 +262,8 @@ ColumnarReadRowByRowNumber(Relation relation, uint64 rowNumber,
List *neededColumnList, Datum *columnValues,
bool *columnNulls, Snapshot snapshot)
{
StripeMetadata *stripeMetadata = FindStripeByRowNumber(relation, rowNumber, snapshot);
StripeMetadata *stripeMetadata = FindStripeByRowNumber(relation->rd_node, rowNumber,
snapshot);
if (stripeMetadata == NULL)
{
/* no such row exists */

View File

@ -584,7 +584,26 @@ columnar_tuple_delete(Relation relation, ItemPointer tid, CommandId cid,
Snapshot snapshot, Snapshot crosscheck, bool wait,
TM_FailureData *tmfd, bool changingPart)
{
elog(ERROR, "columnar_tuple_delete not implemented");
/*
* columnar_init_write_state allocates the write state in a longer
* lasting context, so no need to worry about it.
*/
ColumnarWriteState *writeState = columnar_init_write_state(relation,
RelationGetDescr(relation),
GetCurrentSubTransactionId());
MemoryContext oldContext = MemoryContextSwitchTo(
ColumnarWritePerTupleContext(writeState));
ColumnarCheckLogicalReplication(relation);
uint64 rowNumber = tid_to_row_number(*tid);
ColumnarWriteDeleteRow(writeState, rowNumber);
MemoryContextSwitchTo(oldContext);
MemoryContextReset(ColumnarWritePerTupleContext(writeState));
return TM_Ok;
}
@ -594,7 +613,8 @@ columnar_tuple_update(Relation relation, ItemPointer otid, TupleTableSlot *slot,
bool wait, TM_FailureData *tmfd,
LockTupleMode *lockmode, bool *update_indexes)
{
elog(ERROR, "columnar_tuple_update not implemented");
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("UPDATE not supported for ColumnarScan")));
}
@ -1916,7 +1936,7 @@ ColumnarCheckLogicalReplication(Relation rel)
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg(
"cannot insert into columnar table that is a part of a publication")));
"cannot modify columnar table that is a part of a publication")));
}
}

View File

@ -33,6 +33,8 @@
#include "columnar/columnar_storage.h"
#include "columnar/columnar_version_compat.h"
#include "distributed/listutils.h"
struct ColumnarWriteState
{
TupleDesc tupleDescriptor;
@ -40,6 +42,7 @@ struct ColumnarWriteState
RelFileNode relfilenode;
MemoryContext stripeWriteContext;
MemoryContext stripeRemoveContext;
MemoryContext perTupleContext;
StripeBuffers *stripeBuffers;
StripeSkipList *stripeSkipList;
@ -49,6 +52,8 @@ struct ColumnarWriteState
List *chunkGroupRowCounts;
List *removedRows;
/*
* compressionBuffer buffer is used as temporary storage during
* data value compression operation. It is kept here to minimize
@ -65,6 +70,7 @@ static StripeSkipList * CreateEmptyStripeSkipList(uint32 stripeMaxRowCount,
uint32 chunkRowCount,
uint32 columnCount);
static void FlushStripe(ColumnarWriteState *writeState);
static void FlushDeletes(ColumnarWriteState *writeState);
static StringInfo SerializeBoolArray(bool *boolArray, uint32 boolArrayLength);
static void SerializeSingleDatum(StringInfo datumBuffer, Datum datum,
bool datumTypeByValue, int datumTypeLength,
@ -117,6 +123,10 @@ ColumnarBeginWrite(RelFileNode relfilenode,
"Stripe Write Memory Context",
ALLOCSET_DEFAULT_SIZES);
MemoryContext stripeRemoveContext = AllocSetContextCreate(CurrentMemoryContext,
"Stripe Delete Memory Context",
ALLOCSET_DEFAULT_SIZES);
bool *columnMaskArray = palloc(columnCount * sizeof(bool));
memset(columnMaskArray, true, columnCount);
@ -132,12 +142,15 @@ ColumnarBeginWrite(RelFileNode relfilenode,
writeState->stripeSkipList = NULL;
writeState->stripeFirstRowNumber = COLUMNAR_INVALID_ROW_NUMBER;
writeState->stripeWriteContext = stripeWriteContext;
writeState->stripeRemoveContext = stripeRemoveContext;
writeState->chunkData = chunkData;
writeState->compressionBuffer = NULL;
writeState->perTupleContext = AllocSetContextCreate(CurrentMemoryContext,
"Columnar per tuple context",
ALLOCSET_DEFAULT_SIZES);
writeState->removedRows = NIL;
return writeState;
}
@ -251,6 +264,23 @@ ColumnarWriteRow(ColumnarWriteState *writeState, Datum *columnValues, bool *colu
}
void
ColumnarWriteDeleteRow(ColumnarWriteState *writeState, uint64 rowNumber)
{
MemoryContext oldContext = MemoryContextSwitchTo(writeState->stripeRemoveContext);
uint64 *rowNumberCopy = palloc0(sizeof(uint64));
*rowNumberCopy = rowNumber;
writeState->removedRows = lappend(writeState->removedRows, rowNumberCopy);
if (list_length(writeState->removedRows) >= writeState->options.stripeRowCount)
{
ColumnarFlushPendingDeletes(writeState);
}
MemoryContextSwitchTo(oldContext);
}
/*
* ColumnarEndWrite finishes a columnar data load operation. If we have an unflushed
* stripe, we flush it.
@ -259,6 +289,7 @@ void
ColumnarEndWrite(ColumnarWriteState *writeState)
{
ColumnarFlushPendingWrites(writeState);
ColumnarFlushPendingDeletes(writeState);
MemoryContextDelete(writeState->stripeWriteContext);
pfree(writeState->comparisonFunctionArray);
@ -287,6 +318,26 @@ ColumnarFlushPendingWrites(ColumnarWriteState *writeState)
}
void
ColumnarFlushPendingDeletes(ColumnarWriteState *writeState)
{
if (list_length(writeState->removedRows) >= 0)
{
MemoryContext oldContext = MemoryContextSwitchTo(writeState->stripeRemoveContext);
/* flush deletes to visibility table */
FlushDeletes(writeState);
MemoryContextReset(writeState->stripeRemoveContext);
/* remove cached data for deleted rows */
/* TODO: this is modelled based on ColumnarFlushPendingWrites, might need pfree */
writeState->removedRows = NIL;
MemoryContextSwitchTo(oldContext);
}
}
/*
* ColumnarWritePerTupleContext
*
@ -499,6 +550,20 @@ FlushStripe(ColumnarWriteState *writeState)
}
static void
FlushDeletes(ColumnarWriteState *writeState)
{
uint64 *rownum = NULL;
foreach_ptr(rownum, writeState->removedRows)
{
StripeMetadata *metadata = FindStripeByRowNumber(writeState->relfilenode, *rownum,
NULL);
InsertRemovedRowInformation(writeState->relfilenode, metadata->id,
(*rownum) - metadata->firstRowNumber);
}
}
/*
* SerializeBoolArray serializes the given boolean array and returns the result
* as a StringInfo. This function packs every 8 boolean values into one byte.
@ -748,5 +813,6 @@ CopyStringInfo(StringInfo sourceString)
bool
ContainsPendingWrites(ColumnarWriteState *state)
{
return state->stripeBuffers != NULL && state->stripeBuffers->rowCount != 0;
return state->stripeBuffers != NULL && state->stripeBuffers->rowCount != 0 &&
list_length(state->removedRows) == 0;
}

View File

@ -30,3 +30,10 @@ $$;
-- upgrade storage for all columnar relations
SELECT citus_internal.upgrade_columnar_storage(c.oid) FROM pg_class c, pg_am a
WHERE c.relam = a.oid AND amname = 'columnar';
CREATE TABLE columnar.visibility (
storage_id bigint NOT NULL,
stripe_num bigint NOT NULL,
row_num bigint NOT NULL,
PRIMARY KEY (storage_id, stripe_num, row_num)
) WITH (user_catalog_table = true);

View File

@ -215,6 +215,7 @@ FlushWriteStateForRelfilenode(Oid relfilenode, SubTransactionId currentSubXid)
if (stackEntry->subXid == currentSubXid)
{
ColumnarFlushPendingWrites(stackEntry->writeState);
ColumnarFlushPendingDeletes(stackEntry->writeState);
}
}
}

View File

@ -203,7 +203,9 @@ extern ColumnarWriteState * ColumnarBeginWrite(RelFileNode relfilenode,
TupleDesc tupleDescriptor);
extern uint64 ColumnarWriteRow(ColumnarWriteState *state, Datum *columnValues,
bool *columnNulls);
extern void ColumnarWriteDeleteRow(ColumnarWriteState *writeState, uint64 rowNumber);
extern void ColumnarFlushPendingWrites(ColumnarWriteState *state);
extern void ColumnarFlushPendingDeletes(ColumnarWriteState *writeState);
extern void ColumnarEndWrite(ColumnarWriteState *state);
extern bool ContainsPendingWrites(ColumnarWriteState *state);
extern MemoryContext ColumnarWritePerTupleContext(ColumnarWriteState *state);
@ -246,6 +248,8 @@ extern StripeMetadata ReserveStripe(Relation rel, uint64 size,
uint64 rowCount, uint64 columnCount,
uint64 chunkCount, uint64 chunkGroupRowCount,
uint64 stripeFirstRowNumber);
extern void InsertRemovedRowInformation(RelFileNode relfilenode, uint64 stripenum,
uint64 rownum);
extern void SaveStripeSkipList(RelFileNode relfilenode, uint64 stripe,
StripeSkipList *stripeSkipList,
TupleDesc tupleDescriptor);
@ -254,7 +258,7 @@ extern void SaveChunkGroups(RelFileNode relfilenode, uint64 stripe,
extern StripeSkipList * ReadStripeSkipList(RelFileNode relfilenode, uint64 stripe,
TupleDesc tupleDescriptor,
uint32 chunkCount);
extern StripeMetadata * FindStripeByRowNumber(Relation relation, uint64 rowNumber,
extern StripeMetadata * FindStripeByRowNumber(RelFileNode relfilenode, uint64 rowNumber,
Snapshot snapshot);
extern StripeMetadata * FindStripeWithHighestRowNumber(Relation relation,
Snapshot snapshot);