diff --git a/src/backend/columnar/columnar_metadata.c b/src/backend/columnar/columnar_metadata.c index 0ebb533c7..e1cab2f19 100644 --- a/src/backend/columnar/columnar_metadata.c +++ b/src/backend/columnar/columnar_metadata.c @@ -84,6 +84,8 @@ static Oid ColumnarChunkRelationId(void); static Oid ColumnarChunkGroupRelationId(void); static Oid ColumnarChunkIndexRelationId(void); static Oid ColumnarChunkGroupIndexRelationId(void); +static Oid ColumnarVisibilityRelationId(void); +static Oid ColumnarVisibilityIndexRelationId(void); static Oid ColumnarNamespaceId(void); static uint64 LookupStorageId(RelFileNode relfilenode); static uint64 GetHighestUsedFirstRowNumber(uint64 storageId); @@ -165,6 +167,11 @@ typedef FormData_columnar_options *Form_columnar_options; #define Anum_columnar_chunk_value_decompressed_size 13 #define Anum_columnar_chunk_value_count 14 +/* constants for columnar.visibility */ +#define Natts_columnar_visibility 3 +#define Anum_columnar_visibility_storage_id 1 +#define Anum_columnar_visibility_stripe_num 2 +#define Anum_columnar_visibility_row_num 3 /* * InitColumnarOptions initialized the columnar table options. Meaning it writes the @@ -627,11 +634,11 @@ ReadStripeSkipList(RelFileNode relfilenode, uint64 stripe, TupleDesc tupleDescri * stripe_first_row_number_idx. If no such row exists, then returns NULL. */ StripeMetadata * -FindStripeByRowNumber(Relation relation, uint64 rowNumber, Snapshot snapshot) +FindStripeByRowNumber(RelFileNode relfilenode, uint64 rowNumber, Snapshot snapshot) { StripeMetadata *foundStripeMetadata = NULL; - uint64 storageId = ColumnarStorageGetStorageId(relation, false); + uint64 storageId = LookupStorageId(relfilenode); ScanKeyData scanKey[2]; ScanKeyInit(&scanKey[0], Anum_columnar_stripe_storageid, BTEqualStrategyNumber, F_OIDEQ, Int32GetDatum(storageId)); @@ -899,6 +906,34 @@ ReserveStripe(Relation rel, uint64 sizeBytes, } +void +InsertRemovedRowInformation(RelFileNode relfilenode, uint64 stripenum, uint64 rownum) +{ + uint64 storageId = LookupStorageId(relfilenode); + + bool nulls[Natts_columnar_visibility] = { 0 }; + Datum values[Natts_columnar_visibility] = { + UInt64GetDatum(storageId), + Int64GetDatum(stripenum), /* TODO: find the stripe number per row number */ + Int64GetDatum(rownum) + }; + + Oid columnarVisibilityRelationId = ColumnarVisibilityRelationId(); + Relation columnarVisibility = table_open(columnarVisibilityRelationId, + RowExclusiveLock); + + ModifyState *modifyState = StartModifyRelation(columnarVisibility); + + InsertTupleAndEnforceConstraints(modifyState, values, nulls); + + FinishModifyRelation(modifyState); + + CommandCounterIncrement(); + + table_close(columnarVisibility, RowExclusiveLock); +} + + /* * ReadDataFileStripeList reads the stripe list for a given storageId * in the given snapshot. @@ -998,6 +1033,10 @@ DeleteMetadataRows(RelFileNode relfilenode) Anum_columnar_chunk_storageid, ColumnarChunkIndexRelationId(), storageId); + DeleteStorageFromColumnarMetadataTable(ColumnarVisibilityRelationId(), + Anum_columnar_visibility_storage_id, + ColumnarVisibilityIndexRelationId(), + storageId); } @@ -1318,6 +1357,26 @@ ColumnarChunkGroupIndexRelationId(void) } +/* + * ColumnarVisibilityRelationId returns relation id of columnar.visibility. + */ +static Oid +ColumnarVisibilityRelationId(void) +{ + return get_relname_relid("visibility", ColumnarNamespaceId()); +} + + +/* + * ColumnarVisibilityIndexRelationId returns relation id of columnar.visibility_pkey. + */ +static Oid +ColumnarVisibilityIndexRelationId() +{ + return get_relname_relid("visibility_pkey", ColumnarNamespaceId()); +} + + /* * ColumnarNamespaceId returns namespace id of the schema we store columnar * related tables. diff --git a/src/backend/columnar/columnar_reader.c b/src/backend/columnar/columnar_reader.c index 0f1a1767f..0fc3e370d 100644 --- a/src/backend/columnar/columnar_reader.c +++ b/src/backend/columnar/columnar_reader.c @@ -262,7 +262,8 @@ ColumnarReadRowByRowNumber(Relation relation, uint64 rowNumber, List *neededColumnList, Datum *columnValues, bool *columnNulls, Snapshot snapshot) { - StripeMetadata *stripeMetadata = FindStripeByRowNumber(relation, rowNumber, snapshot); + StripeMetadata *stripeMetadata = FindStripeByRowNumber(relation->rd_node, rowNumber, + snapshot); if (stripeMetadata == NULL) { /* no such row exists */ diff --git a/src/backend/columnar/columnar_tableam.c b/src/backend/columnar/columnar_tableam.c index 804e22512..6c059222a 100644 --- a/src/backend/columnar/columnar_tableam.c +++ b/src/backend/columnar/columnar_tableam.c @@ -584,7 +584,26 @@ columnar_tuple_delete(Relation relation, ItemPointer tid, CommandId cid, Snapshot snapshot, Snapshot crosscheck, bool wait, TM_FailureData *tmfd, bool changingPart) { - elog(ERROR, "columnar_tuple_delete not implemented"); + /* + * columnar_init_write_state allocates the write state in a longer + * lasting context, so no need to worry about it. + */ + ColumnarWriteState *writeState = columnar_init_write_state(relation, + RelationGetDescr(relation), + GetCurrentSubTransactionId()); + + MemoryContext oldContext = MemoryContextSwitchTo( + ColumnarWritePerTupleContext(writeState)); + + ColumnarCheckLogicalReplication(relation); + + uint64 rowNumber = tid_to_row_number(*tid); + ColumnarWriteDeleteRow(writeState, rowNumber); + + MemoryContextSwitchTo(oldContext); + MemoryContextReset(ColumnarWritePerTupleContext(writeState)); + + return TM_Ok; } @@ -594,7 +613,8 @@ columnar_tuple_update(Relation relation, ItemPointer otid, TupleTableSlot *slot, bool wait, TM_FailureData *tmfd, LockTupleMode *lockmode, bool *update_indexes) { - elog(ERROR, "columnar_tuple_update not implemented"); + ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("UPDATE not supported for ColumnarScan"))); } @@ -1916,7 +1936,7 @@ ColumnarCheckLogicalReplication(Relation rel) { ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg( - "cannot insert into columnar table that is a part of a publication"))); + "cannot modify columnar table that is a part of a publication"))); } } diff --git a/src/backend/columnar/columnar_writer.c b/src/backend/columnar/columnar_writer.c index cafef2381..d0d77a9d0 100644 --- a/src/backend/columnar/columnar_writer.c +++ b/src/backend/columnar/columnar_writer.c @@ -33,6 +33,8 @@ #include "columnar/columnar_storage.h" #include "columnar/columnar_version_compat.h" +#include "distributed/listutils.h" + struct ColumnarWriteState { TupleDesc tupleDescriptor; @@ -40,6 +42,7 @@ struct ColumnarWriteState RelFileNode relfilenode; MemoryContext stripeWriteContext; + MemoryContext stripeRemoveContext; MemoryContext perTupleContext; StripeBuffers *stripeBuffers; StripeSkipList *stripeSkipList; @@ -49,6 +52,8 @@ struct ColumnarWriteState List *chunkGroupRowCounts; + List *removedRows; + /* * compressionBuffer buffer is used as temporary storage during * data value compression operation. It is kept here to minimize @@ -65,6 +70,7 @@ static StripeSkipList * CreateEmptyStripeSkipList(uint32 stripeMaxRowCount, uint32 chunkRowCount, uint32 columnCount); static void FlushStripe(ColumnarWriteState *writeState); +static void FlushDeletes(ColumnarWriteState *writeState); static StringInfo SerializeBoolArray(bool *boolArray, uint32 boolArrayLength); static void SerializeSingleDatum(StringInfo datumBuffer, Datum datum, bool datumTypeByValue, int datumTypeLength, @@ -117,6 +123,10 @@ ColumnarBeginWrite(RelFileNode relfilenode, "Stripe Write Memory Context", ALLOCSET_DEFAULT_SIZES); + MemoryContext stripeRemoveContext = AllocSetContextCreate(CurrentMemoryContext, + "Stripe Delete Memory Context", + ALLOCSET_DEFAULT_SIZES); + bool *columnMaskArray = palloc(columnCount * sizeof(bool)); memset(columnMaskArray, true, columnCount); @@ -132,12 +142,15 @@ ColumnarBeginWrite(RelFileNode relfilenode, writeState->stripeSkipList = NULL; writeState->stripeFirstRowNumber = COLUMNAR_INVALID_ROW_NUMBER; writeState->stripeWriteContext = stripeWriteContext; + writeState->stripeRemoveContext = stripeRemoveContext; writeState->chunkData = chunkData; writeState->compressionBuffer = NULL; writeState->perTupleContext = AllocSetContextCreate(CurrentMemoryContext, "Columnar per tuple context", ALLOCSET_DEFAULT_SIZES); + writeState->removedRows = NIL; + return writeState; } @@ -251,6 +264,23 @@ ColumnarWriteRow(ColumnarWriteState *writeState, Datum *columnValues, bool *colu } +void +ColumnarWriteDeleteRow(ColumnarWriteState *writeState, uint64 rowNumber) +{ + MemoryContext oldContext = MemoryContextSwitchTo(writeState->stripeRemoveContext); + uint64 *rowNumberCopy = palloc0(sizeof(uint64)); + *rowNumberCopy = rowNumber; + writeState->removedRows = lappend(writeState->removedRows, rowNumberCopy); + + if (list_length(writeState->removedRows) >= writeState->options.stripeRowCount) + { + ColumnarFlushPendingDeletes(writeState); + } + + MemoryContextSwitchTo(oldContext); +} + + /* * ColumnarEndWrite finishes a columnar data load operation. If we have an unflushed * stripe, we flush it. @@ -259,6 +289,7 @@ void ColumnarEndWrite(ColumnarWriteState *writeState) { ColumnarFlushPendingWrites(writeState); + ColumnarFlushPendingDeletes(writeState); MemoryContextDelete(writeState->stripeWriteContext); pfree(writeState->comparisonFunctionArray); @@ -287,6 +318,26 @@ ColumnarFlushPendingWrites(ColumnarWriteState *writeState) } +void +ColumnarFlushPendingDeletes(ColumnarWriteState *writeState) +{ + if (list_length(writeState->removedRows) >= 0) + { + MemoryContext oldContext = MemoryContextSwitchTo(writeState->stripeRemoveContext); + + /* flush deletes to visibility table */ + FlushDeletes(writeState); + MemoryContextReset(writeState->stripeRemoveContext); + + /* remove cached data for deleted rows */ + /* TODO: this is modelled based on ColumnarFlushPendingWrites, might need pfree */ + writeState->removedRows = NIL; + + MemoryContextSwitchTo(oldContext); + } +} + + /* * ColumnarWritePerTupleContext * @@ -499,6 +550,20 @@ FlushStripe(ColumnarWriteState *writeState) } +static void +FlushDeletes(ColumnarWriteState *writeState) +{ + uint64 *rownum = NULL; + foreach_ptr(rownum, writeState->removedRows) + { + StripeMetadata *metadata = FindStripeByRowNumber(writeState->relfilenode, *rownum, + NULL); + InsertRemovedRowInformation(writeState->relfilenode, metadata->id, + (*rownum) - metadata->firstRowNumber); + } +} + + /* * SerializeBoolArray serializes the given boolean array and returns the result * as a StringInfo. This function packs every 8 boolean values into one byte. @@ -748,5 +813,6 @@ CopyStringInfo(StringInfo sourceString) bool ContainsPendingWrites(ColumnarWriteState *state) { - return state->stripeBuffers != NULL && state->stripeBuffers->rowCount != 0; + return state->stripeBuffers != NULL && state->stripeBuffers->rowCount != 0 && + list_length(state->removedRows) == 0; } diff --git a/src/backend/columnar/sql/columnar--10.1-1--10.2-1.sql b/src/backend/columnar/sql/columnar--10.1-1--10.2-1.sql index 0666d2f54..06f0a34a3 100644 --- a/src/backend/columnar/sql/columnar--10.1-1--10.2-1.sql +++ b/src/backend/columnar/sql/columnar--10.1-1--10.2-1.sql @@ -30,3 +30,10 @@ $$; -- upgrade storage for all columnar relations SELECT citus_internal.upgrade_columnar_storage(c.oid) FROM pg_class c, pg_am a WHERE c.relam = a.oid AND amname = 'columnar'; + +CREATE TABLE columnar.visibility ( + storage_id bigint NOT NULL, + stripe_num bigint NOT NULL, + row_num bigint NOT NULL, + PRIMARY KEY (storage_id, stripe_num, row_num) +) WITH (user_catalog_table = true); diff --git a/src/backend/columnar/write_state_management.c b/src/backend/columnar/write_state_management.c index 69860ad57..0866000cf 100644 --- a/src/backend/columnar/write_state_management.c +++ b/src/backend/columnar/write_state_management.c @@ -215,6 +215,7 @@ FlushWriteStateForRelfilenode(Oid relfilenode, SubTransactionId currentSubXid) if (stackEntry->subXid == currentSubXid) { ColumnarFlushPendingWrites(stackEntry->writeState); + ColumnarFlushPendingDeletes(stackEntry->writeState); } } } diff --git a/src/include/columnar/columnar.h b/src/include/columnar/columnar.h index e4770acc2..cef822e1b 100644 --- a/src/include/columnar/columnar.h +++ b/src/include/columnar/columnar.h @@ -203,7 +203,9 @@ extern ColumnarWriteState * ColumnarBeginWrite(RelFileNode relfilenode, TupleDesc tupleDescriptor); extern uint64 ColumnarWriteRow(ColumnarWriteState *state, Datum *columnValues, bool *columnNulls); +extern void ColumnarWriteDeleteRow(ColumnarWriteState *writeState, uint64 rowNumber); extern void ColumnarFlushPendingWrites(ColumnarWriteState *state); +extern void ColumnarFlushPendingDeletes(ColumnarWriteState *writeState); extern void ColumnarEndWrite(ColumnarWriteState *state); extern bool ContainsPendingWrites(ColumnarWriteState *state); extern MemoryContext ColumnarWritePerTupleContext(ColumnarWriteState *state); @@ -246,6 +248,8 @@ extern StripeMetadata ReserveStripe(Relation rel, uint64 size, uint64 rowCount, uint64 columnCount, uint64 chunkCount, uint64 chunkGroupRowCount, uint64 stripeFirstRowNumber); +extern void InsertRemovedRowInformation(RelFileNode relfilenode, uint64 stripenum, + uint64 rownum); extern void SaveStripeSkipList(RelFileNode relfilenode, uint64 stripe, StripeSkipList *stripeSkipList, TupleDesc tupleDescriptor); @@ -254,7 +258,7 @@ extern void SaveChunkGroups(RelFileNode relfilenode, uint64 stripe, extern StripeSkipList * ReadStripeSkipList(RelFileNode relfilenode, uint64 stripe, TupleDesc tupleDescriptor, uint32 chunkCount); -extern StripeMetadata * FindStripeByRowNumber(Relation relation, uint64 rowNumber, +extern StripeMetadata * FindStripeByRowNumber(RelFileNode relfilenode, uint64 rowNumber, Snapshot snapshot); extern StripeMetadata * FindStripeWithHighestRowNumber(Relation relation, Snapshot snapshot);