diff --git a/src/backend/columnar/cstore_fdw.c b/src/backend/columnar/cstore_fdw.c index 19b3d1847..634a76f5d 100644 --- a/src/backend/columnar/cstore_fdw.c +++ b/src/backend/columnar/cstore_fdw.c @@ -549,7 +549,7 @@ CopyIntoCStoreTable(const CopyStmt *copyStatement, const char *queryString) #endif /* init state to write to the cstore file */ - writeState = CStoreBeginWrite(relation, + writeState = CStoreBeginWrite(relation->rd_node, cstoreOptions->compressionType, cstoreOptions->stripeRowCount, cstoreOptions->blockRowCount, @@ -1992,13 +1992,16 @@ CStoreBeginForeignInsert(ModifyTableState *modifyTableState, ResultRelInfo *rela CStoreOptions *cstoreOptions = CStoreGetOptions(foreignTableOid); TupleDesc tupleDescriptor = RelationGetDescr(relationInfo->ri_RelationDesc); - TableWriteState *writeState = CStoreBeginWrite(relation, + TableWriteState *writeState = CStoreBeginWrite(relation->rd_node, cstoreOptions->compressionType, cstoreOptions->stripeRowCount, cstoreOptions->blockRowCount, tupleDescriptor); relationInfo->ri_FdwState = (void *) writeState; + + /* keep the lock */ + relation_close(relation, NoLock); } @@ -2055,10 +2058,7 @@ CStoreEndForeignInsert(EState *executorState, ResultRelInfo *relationInfo) /* writeState is NULL during Explain queries */ if (writeState != NULL) { - Relation relation = writeState->relation; - CStoreEndWrite(writeState); - heap_close(relation, RowExclusiveLock); } } diff --git a/src/backend/columnar/cstore_metadata_tables.c b/src/backend/columnar/cstore_metadata_tables.c index 301dd0c91..26e176535 100644 --- a/src/backend/columnar/cstore_metadata_tables.c +++ b/src/backend/columnar/cstore_metadata_tables.c @@ -134,18 +134,19 @@ InitCStoreDataFileMetadata(Oid relfilenode, int blockRowCount, int stripeRowCoun { NameData compressionName = { 0 }; - namestrcpy(&compressionName, CompressionTypeStr(compression)); - bool nulls[Natts_cstore_data_files] = { 0 }; Datum values[Natts_cstore_data_files] = { ObjectIdGetDatum(relfilenode), Int32GetDatum(blockRowCount), Int32GetDatum(stripeRowCount), - NameGetDatum(&compressionName), + 0, /* to be filled below */ Int32GetDatum(CSTORE_VERSION_MAJOR), Int32GetDatum(CSTORE_VERSION_MINOR) }; + namestrcpy(&compressionName, CompressionTypeStr(compression)); + values[Anum_cstore_data_files_compression - 1] = NameGetDatum(&compressionName); + DeleteDataFileMetadataRowIfExists(relfilenode); Oid cstoreDataFilesOid = CStoreDataFilesRelationId(); @@ -171,6 +172,7 @@ UpdateCStoreDataFileMetadata(Oid relfilenode, int blockRowCount, int stripeRowCo Datum values[Natts_cstore_data_files] = { 0 }; bool isnull[Natts_cstore_data_files] = { 0 }; bool replace[Natts_cstore_data_files] = { 0 }; + bool changed = false; Relation cstoreDataFiles = heap_open(CStoreDataFilesRelationId(), RowExclusiveLock); TupleDesc tupleDescriptor = RelationGetDescr(cstoreDataFiles); @@ -192,7 +194,6 @@ UpdateCStoreDataFileMetadata(Oid relfilenode, int blockRowCount, int stripeRowCo Form_cstore_data_files metadata = (Form_cstore_data_files) GETSTRUCT(heapTuple); - bool changed = false; if (metadata->block_row_count != blockRowCount) { values[Anum_cstore_data_files_block_row_count - 1] = Int32GetDatum(blockRowCount); diff --git a/src/backend/columnar/cstore_tableam.c b/src/backend/columnar/cstore_tableam.c index 6ede404bb..1669b571c 100644 --- a/src/backend/columnar/cstore_tableam.c +++ b/src/backend/columnar/cstore_tableam.c @@ -68,10 +68,7 @@ typedef struct CStoreScanDescData typedef struct CStoreScanDescData *CStoreScanDesc; -static TableWriteState *CStoreWriteState = NULL; -static ExecutorEnd_hook_type PreviousExecutorEndHook = NULL; -static MemoryContext CStoreContext = NULL; -static object_access_hook_type prevObjectAccessHook = NULL; +static object_access_hook_type PrevObjectAccessHook = NULL; static ProcessUtility_hook_type PreviousProcessUtilityHook = NULL; /* forward declaration for static functions */ @@ -122,13 +119,13 @@ CStoreTableAMDefaultOptions() * CStoreTableAMGetOptions returns the options based on a relation. It is advised the * relation is a cstore table am table, if not it will raise an error */ -static CStoreOptions * -CStoreTableAMGetOptions(Relation rel) +CStoreOptions * +CStoreTableAMGetOptions(Oid relfilenode) { - Assert(rel != NULL); + Assert(OidIsValid(relfilenode)); CStoreOptions *cstoreOptions = palloc0(sizeof(CStoreOptions)); - DataFileMetadata *metadata = ReadDataFileMetadata(rel->rd_node.relNode, false); + DataFileMetadata *metadata = ReadDataFileMetadata(relfilenode, false); cstoreOptions->compressionType = metadata->compression; cstoreOptions->stripeRowCount = metadata->stripeRowCount; cstoreOptions->blockRowCount = metadata->blockRowCount; @@ -136,66 +133,6 @@ CStoreTableAMGetOptions(Relation rel) } -static MemoryContext -GetCStoreMemoryContext() -{ - if (CStoreContext == NULL) - { - CStoreContext = AllocSetContextCreate(TopMemoryContext, "cstore context", - ALLOCSET_DEFAULT_SIZES); - } - return CStoreContext; -} - - -static void -ResetCStoreMemoryContext() -{ - if (CStoreContext != NULL) - { - MemoryContextReset(CStoreContext); - } -} - - -static void -cstore_init_write_state(Relation relation) -{ - if (CStoreWriteState != NULL) - { - /* TODO: consider whether it's possible for a new write to start */ - /* before an old one is flushed */ - Assert(CStoreWriteState->relation->rd_id == relation->rd_id); - } - - if (CStoreWriteState == NULL) - { - CStoreOptions *cstoreOptions = CStoreTableAMGetOptions(relation); - TupleDesc tupdesc = RelationGetDescr(relation); - - elog(LOG, "initializing write state for relation %d", relation->rd_id); - CStoreWriteState = CStoreBeginWrite(relation, - cstoreOptions->compressionType, - cstoreOptions->stripeRowCount, - cstoreOptions->blockRowCount, - tupdesc); - } -} - - -static void -cstore_free_write_state() -{ - if (CStoreWriteState != NULL) - { - elog(LOG, "flushing write state for relation %d", - CStoreWriteState->relation->rd_id); - CStoreEndWrite(CStoreWriteState); - CStoreWriteState = NULL; - } -} - - static List * RelationColumnList(Relation rel) { @@ -263,9 +200,10 @@ cstore_beginscan_extended(Relation relation, Snapshot snapshot, uint32 flags, Bitmapset *attr_needed, List *scanQual) { TupleDesc tupdesc = relation->rd_att; + Oid relfilenode = relation->rd_node.relNode; CStoreScanDesc scan = palloc(sizeof(CStoreScanDescData)); List *neededColumnList = NIL; - MemoryContext oldContext = MemoryContextSwitchTo(GetCStoreMemoryContext()); + MemoryContext oldContext = MemoryContextSwitchTo(TopTransactionContext); ListCell *columnCell = NULL; scan->cs_base.rs_rd = relation; @@ -275,6 +213,15 @@ cstore_beginscan_extended(Relation relation, Snapshot snapshot, scan->cs_base.rs_flags = flags; scan->cs_base.rs_parallel = parallel_scan; + if (PendingWritesInUpperTransactions(relfilenode, GetCurrentSubTransactionId())) + { + elog(ERROR, + "cannot read from table when there is unflushed data in upper transactions"); + } + + FlushWriteStateForRelfilenode(relfilenode, GetCurrentSubTransactionId()); + + List *columnList = RelationColumnList(relation); /* only collect columns that we need for the scan */ @@ -319,7 +266,7 @@ static bool cstore_getnextslot(TableScanDesc sscan, ScanDirection direction, TupleTableSlot *slot) { CStoreScanDesc scan = (CStoreScanDesc) sscan; - MemoryContext oldContext = MemoryContextSwitchTo(GetCStoreMemoryContext()); + MemoryContext oldContext = MemoryContextSwitchTo(TopTransactionContext); ExecClearTuple(slot); @@ -437,9 +384,9 @@ static void cstore_tuple_insert(Relation relation, TupleTableSlot *slot, CommandId cid, int options, BulkInsertState bistate) { - MemoryContext oldContext = MemoryContextSwitchTo(GetCStoreMemoryContext()); - - cstore_init_write_state(relation); + TableWriteState *writeState = cstore_init_write_state(relation->rd_node, + RelationGetDescr(relation), + GetCurrentSubTransactionId()); HeapTuple heapTuple = ExecCopySlotHeapTuple(slot); if (HeapTupleHasExternal(heapTuple)) @@ -453,8 +400,7 @@ cstore_tuple_insert(Relation relation, TupleTableSlot *slot, CommandId cid, slot_getallattrs(slot); - CStoreWriteRow(CStoreWriteState, slot->tts_values, slot->tts_isnull); - MemoryContextSwitchTo(oldContext); + CStoreWriteRow(writeState, slot->tts_values, slot->tts_isnull); } @@ -479,9 +425,9 @@ static void cstore_multi_insert(Relation relation, TupleTableSlot **slots, int ntuples, CommandId cid, int options, BulkInsertState bistate) { - MemoryContext oldContext = MemoryContextSwitchTo(GetCStoreMemoryContext()); - - cstore_init_write_state(relation); + TableWriteState *writeState = cstore_init_write_state(relation->rd_node, + RelationGetDescr(relation), + GetCurrentSubTransactionId()); for (int i = 0; i < ntuples; i++) { @@ -499,9 +445,8 @@ cstore_multi_insert(Relation relation, TupleTableSlot **slots, int ntuples, slot_getallattrs(tupleSlot); - CStoreWriteRow(CStoreWriteState, tupleSlot->tts_values, tupleSlot->tts_isnull); + CStoreWriteRow(writeState, tupleSlot->tts_values, tupleSlot->tts_isnull); } - MemoryContextSwitchTo(oldContext); } @@ -537,11 +482,9 @@ cstore_tuple_lock(Relation relation, ItemPointer tid, Snapshot snapshot, static void cstore_finish_bulk_insert(Relation relation, int options) { - /*TODO: flush relation like for heap? */ - /* free write state or only in ExecutorEnd_hook? */ - - /* for COPY */ - cstore_free_write_state(); + /* + * Nothing to do here. We keep write states live until transaction end. + */ } @@ -556,6 +499,9 @@ cstore_relation_set_new_filenode(Relation rel, uint64 blockRowCount = 0; uint64 stripeRowCount = 0; CompressionType compression = 0; + Oid oldRelfilenode = rel->rd_node.relNode; + + MarkRelfilenodeDropped(oldRelfilenode, GetCurrentSubTransactionId()); if (metadata != NULL) { @@ -589,7 +535,10 @@ cstore_relation_set_new_filenode(Relation rel, static void cstore_relation_nontransactional_truncate(Relation rel) { - DataFileMetadata *metadata = ReadDataFileMetadata(rel->rd_node.relNode, false); + Oid relfilenode = rel->rd_node.relNode; + DataFileMetadata *metadata = ReadDataFileMetadata(relfilenode, false); + + NonTransactionDropWriteState(relfilenode); /* * No need to set new relfilenode, since the table was created in this @@ -651,23 +600,23 @@ cstore_relation_copy_for_cluster(Relation OldHeap, Relation NewHeap, * relation first. */ - CStoreOptions *cstoreOptions = CStoreTableAMGetOptions(OldHeap); + CStoreOptions *cstoreOptions = CStoreTableAMGetOptions(OldHeap->rd_node.relNode); UpdateCStoreDataFileMetadata(NewHeap->rd_node.relNode, cstoreOptions->blockRowCount, cstoreOptions->stripeRowCount, cstoreOptions->compressionType); - cstoreOptions = CStoreTableAMGetOptions(NewHeap); + cstoreOptions = CStoreTableAMGetOptions(NewHeap->rd_node.relNode); - TableWriteState *writeState = CStoreBeginWrite(NewHeap, + TableWriteState *writeState = CStoreBeginWrite(NewHeap->rd_node, cstoreOptions->compressionType, cstoreOptions->stripeRowCount, cstoreOptions->blockRowCount, targetDesc); - TableReadState *readState = CStoreBeginRead(OldHeap, sourceDesc, RelationColumnList( - OldHeap), NULL); + TableReadState *readState = CStoreBeginRead(OldHeap, sourceDesc, + RelationColumnList(OldHeap), NULL); Datum *values = palloc0(sourceDesc->natts * sizeof(Datum)); bool *nulls = palloc0(sourceDesc->natts * sizeof(bool)); @@ -1046,18 +995,61 @@ cstore_scan_sample_next_tuple(TableScanDesc scan, SampleScanState *scanstate, static void -CStoreExecutorEnd(QueryDesc *queryDesc) +CStoreXactCallback(XactEvent event, void *arg) { - cstore_free_write_state(); - if (PreviousExecutorEndHook) + switch (event) { - PreviousExecutorEndHook(queryDesc); + case XACT_EVENT_COMMIT: + case XACT_EVENT_PARALLEL_COMMIT: + case XACT_EVENT_PREPARE: + { + /* nothing to do */ + break; + } + + case XACT_EVENT_ABORT: + case XACT_EVENT_PARALLEL_ABORT: + { + DiscardWriteStateForAllRels(GetCurrentSubTransactionId(), 0); + break; + } + + case XACT_EVENT_PRE_COMMIT: + case XACT_EVENT_PARALLEL_PRE_COMMIT: + case XACT_EVENT_PRE_PREPARE: + { + FlushWriteStateForAllRels(GetCurrentSubTransactionId(), 0); + break; + } } - else +} + + +static void +CStoreSubXactCallback(SubXactEvent event, SubTransactionId mySubid, + SubTransactionId parentSubid, void *arg) +{ + switch (event) { - standard_ExecutorEnd(queryDesc); + case SUBXACT_EVENT_START_SUB: + case SUBXACT_EVENT_COMMIT_SUB: + { + /* nothing to do */ + break; + } + + case SUBXACT_EVENT_ABORT_SUB: + { + DiscardWriteStateForAllRels(mySubid, parentSubid); + break; + } + + case SUBXACT_EVENT_PRE_COMMIT_SUB: + { + FlushWriteStateForAllRels(mySubid, parentSubid); + break; + } } - ResetCStoreMemoryContext(); } @@ -1109,12 +1101,13 @@ CStoreTableAMProcessUtility(PlannedStmt * plannedStatement, void cstore_tableam_init() { - PreviousExecutorEndHook = ExecutorEnd_hook; - ExecutorEnd_hook = CStoreExecutorEnd; + RegisterXactCallback(CStoreXactCallback, NULL); + RegisterSubXactCallback(CStoreSubXactCallback, NULL); + PreviousProcessUtilityHook = (ProcessUtility_hook != NULL) ? ProcessUtility_hook : standard_ProcessUtility; ProcessUtility_hook = CStoreTableAMProcessUtility; - prevObjectAccessHook = object_access_hook; + PrevObjectAccessHook = object_access_hook; object_access_hook = CStoreTableAMObjectAccessHook; cstore_customscan_init(); @@ -1124,7 +1117,7 @@ cstore_tableam_init() void cstore_tableam_finish() { - ExecutorEnd_hook = PreviousExecutorEndHook; + object_access_hook = PrevObjectAccessHook; } @@ -1140,9 +1133,9 @@ CStoreTableAMObjectAccessHook(ObjectAccessType access, Oid classId, Oid objectId subId, void *arg) { - if (prevObjectAccessHook) + if (PrevObjectAccessHook) { - prevObjectAccessHook(access, classId, objectId, subId, arg); + PrevObjectAccessHook(access, classId, objectId, subId, arg); } /* @@ -1166,7 +1159,10 @@ CStoreTableAMObjectAccessHook(ObjectAccessType access, Oid classId, Oid objectId * tableam tables storage is managed by postgres. */ Relation rel = table_open(objectId, AccessExclusiveLock); - DeleteDataFileMetadataRowIfExists(rel->rd_node.relNode); + Oid relfilenode = rel->rd_node.relNode; + DeleteDataFileMetadataRowIfExists(relfilenode); + + MarkRelfilenodeDropped(relfilenode, GetCurrentSubTransactionId()); /* keep the lock since we did physical changes to the relation */ table_close(rel, NoLock); diff --git a/src/backend/columnar/cstore_writer.c b/src/backend/columnar/cstore_writer.c index 735024fbf..75629241d 100644 --- a/src/backend/columnar/cstore_writer.c +++ b/src/backend/columnar/cstore_writer.c @@ -18,6 +18,7 @@ #include "safe_lib.h" +#include "access/heapam.h" #include "access/nbtree.h" #include "catalog/pg_am.h" #include "miscadmin.h" @@ -25,6 +26,7 @@ #include "storage/smgr.h" #include "utils/memutils.h" #include "utils/rel.h" +#include "utils/relfilenodemap.h" #include "columnar/cstore.h" #include "columnar/cstore_version_compat.h" @@ -58,7 +60,7 @@ static StringInfo CopyStringInfo(StringInfo sourceString); * will be added. */ TableWriteState * -CStoreBeginWrite(Relation relation, +CStoreBeginWrite(RelFileNode relfilenode, CompressionType compressionType, uint64 stripeMaxRowCount, uint32 blockRowCount, TupleDesc tupleDescriptor) @@ -101,11 +103,11 @@ CStoreBeginWrite(Relation relation, blockRowCount); TableWriteState *writeState = palloc0(sizeof(TableWriteState)); - writeState->relation = relation; + writeState->relfilenode = relfilenode; writeState->compressionType = compressionType; writeState->stripeMaxRowCount = stripeMaxRowCount; writeState->blockRowCount = blockRowCount; - writeState->tupleDescriptor = tupleDescriptor; + writeState->tupleDescriptor = CreateTupleDescCopy(tupleDescriptor); writeState->comparisonFunctionArray = comparisonFunctionArray; writeState->stripeBuffers = NULL; writeState->stripeSkipList = NULL; @@ -205,11 +207,7 @@ CStoreWriteRow(TableWriteState *writeState, Datum *columnValues, bool *columnNul stripeBuffers->rowCount++; if (stripeBuffers->rowCount >= writeState->stripeMaxRowCount) { - FlushStripe(writeState); - - /* set stripe data and skip list to NULL so they are recreated next time */ - writeState->stripeBuffers = NULL; - writeState->stripeSkipList = NULL; + CStoreFlushPendingWrites(writeState); } MemoryContextSwitchTo(oldContext); @@ -225,17 +223,7 @@ CStoreWriteRow(TableWriteState *writeState, Datum *columnValues, bool *columnNul void CStoreEndWrite(TableWriteState *writeState) { - StripeBuffers *stripeBuffers = writeState->stripeBuffers; - - if (stripeBuffers != NULL) - { - MemoryContext oldContext = MemoryContextSwitchTo(writeState->stripeWriteContext); - - FlushStripe(writeState); - MemoryContextReset(writeState->stripeWriteContext); - - MemoryContextSwitchTo(oldContext); - } + CStoreFlushPendingWrites(writeState); MemoryContextDelete(writeState->stripeWriteContext); pfree(writeState->comparisonFunctionArray); @@ -244,6 +232,25 @@ CStoreEndWrite(TableWriteState *writeState) } +void +CStoreFlushPendingWrites(TableWriteState *writeState) +{ + StripeBuffers *stripeBuffers = writeState->stripeBuffers; + if (stripeBuffers != NULL) + { + MemoryContext oldContext = MemoryContextSwitchTo(writeState->stripeWriteContext); + + FlushStripe(writeState); + + /* set stripe data and skip list to NULL so they are recreated next time */ + writeState->stripeBuffers = NULL; + writeState->stripeSkipList = NULL; + + MemoryContextSwitchTo(oldContext); + } +} + + /* * CreateEmptyStripeBuffers allocates an empty StripeBuffers structure with the given * column count. @@ -410,6 +417,10 @@ FlushStripe(TableWriteState *writeState) uint64 stripeSize = 0; uint64 stripeRowCount = 0; + Oid relationId = RelidByRelfilenode(writeState->relfilenode.spcNode, + writeState->relfilenode.relNode); + Relation relation = relation_open(relationId, NoLock); + /* * check if the last block needs serialization , the last block was not serialized * if it was not full yet, e.g. (rowCount > 0) @@ -459,7 +470,7 @@ FlushStripe(TableWriteState *writeState) stripeSkipList->blockSkipNodeArray[0][blockIndex].rowCount; } - stripeMetadata = ReserveStripe(writeState->relation, stripeSize, + stripeMetadata = ReserveStripe(relation, stripeSize, stripeRowCount, columnCount, blockCount, blockRowCount); @@ -486,7 +497,7 @@ FlushStripe(TableWriteState *writeState) columnBuffers->blockBuffersArray[blockIndex]; StringInfo existsBuffer = blockBuffers->existsBuffer; - WriteToSmgr(writeState->relation, currentFileOffset, + WriteToSmgr(relation, currentFileOffset, existsBuffer->data, existsBuffer->len); currentFileOffset += existsBuffer->len; } @@ -497,16 +508,18 @@ FlushStripe(TableWriteState *writeState) columnBuffers->blockBuffersArray[blockIndex]; StringInfo valueBuffer = blockBuffers->valueBuffer; - WriteToSmgr(writeState->relation, currentFileOffset, + WriteToSmgr(relation, currentFileOffset, valueBuffer->data, valueBuffer->len); currentFileOffset += valueBuffer->len; } } /* create skip list and footer buffers */ - SaveStripeSkipList(writeState->relation->rd_node.relNode, + SaveStripeSkipList(relation->rd_node.relNode, stripeMetadata.id, stripeSkipList, tupleDescriptor); + + relation_close(relation, NoLock); } @@ -747,3 +760,10 @@ CopyStringInfo(StringInfo sourceString) return targetString; } + + +bool +ContainsPendingWrites(TableWriteState *state) +{ + return state->stripeBuffers != NULL && state->stripeBuffers->rowCount != 0; +} diff --git a/src/backend/columnar/write_state_management.c b/src/backend/columnar/write_state_management.c new file mode 100644 index 000000000..067d2199c --- /dev/null +++ b/src/backend/columnar/write_state_management.c @@ -0,0 +1,384 @@ + +#include "citus_version.h" +#if HAS_TABLEAM + +#include "postgres.h" + +#include + +#include "miscadmin.h" + +#include "access/genam.h" +#include "access/heapam.h" +#include "access/multixact.h" +#include "access/rewriteheap.h" +#include "access/tsmapi.h" +#if PG_VERSION_NUM >= 130000 +#include "access/heaptoast.h" +#else +#include "access/tuptoaster.h" +#endif +#include "access/xact.h" +#include "catalog/catalog.h" +#include "catalog/index.h" +#include "catalog/objectaccess.h" +#include "catalog/pg_am.h" +#include "catalog/pg_trigger.h" +#include "catalog/storage.h" +#include "catalog/storage_xlog.h" +#include "commands/progress.h" +#include "commands/vacuum.h" +#include "executor/executor.h" +#include "nodes/makefuncs.h" +#include "optimizer/plancat.h" +#include "pgstat.h" +#include "storage/bufmgr.h" +#include "storage/bufpage.h" +#include "storage/bufmgr.h" +#include "storage/lmgr.h" +#include "storage/predicate.h" +#include "storage/procarray.h" +#include "storage/smgr.h" +#include "tcop/utility.h" +#include "utils/builtins.h" +#include "utils/pg_rusage.h" +#include "utils/rel.h" +#include "utils/syscache.h" + +#include "columnar/cstore.h" +#include "columnar/cstore_customscan.h" +#include "columnar/cstore_tableam.h" +#include "columnar/cstore_version_compat.h" + + +/* + * Mapping from relfilenode to WriteStateMapEntry. This keeps write state for + * each relation. + */ +static HTAB *WriteStateMap = NULL; + +/* memory context for allocating WriteStateMap & all write states */ +static MemoryContext WriteStateContext = NULL; + +/* + * Each member of the writeStateStack in WriteStateMapEntry. This means that + * we did some inserts in the subtransaction subXid, and the state of those + * inserts is stored at writeState. Those writes can be flushed or unflushed. + */ +typedef struct SubXidWriteState +{ + SubTransactionId subXid; + TableWriteState *writeState; + + struct SubXidWriteState *next; +} SubXidWriteState; + + +/* + * An entry in WriteStateMap. + */ +typedef struct WriteStateMapEntry +{ + /* key of the entry */ + Oid relfilenode; + + /* + * If a table is dropped, we set dropped to true and set dropSubXid to the + * id of the subtransaction in which the drop happened. + */ + bool dropped; + SubTransactionId dropSubXid; + + /* + * Stack of SubXidWriteState where first element is top of the stack. When + * inserts happen, we look at top of the stack. If top of stack belongs to + * current subtransaction, we forward writes to its writeState. Otherwise, + * we create a new stack entry for current subtransaction and push it to + * the stack, and forward writes to that. + */ + SubXidWriteState *writeStateStack; +} WriteStateMapEntry; + + +/* + * Memory context reset callback so we reset WriteStateMap to NULL at the end + * of transaction. WriteStateMap is allocated in & WriteStateMap, so its + * leaked reference can cause memory issues. + */ +static MemoryContextCallback cleanupCallback; +static void +CleanupWriteStateMap(void *arg) +{ + WriteStateMap = NULL; + WriteStateContext = NULL; +} + + +TableWriteState * +cstore_init_write_state(RelFileNode relfilenode, TupleDesc tupdesc, + SubTransactionId currentSubXid) +{ + bool found; + + /* + * If this is the first call in current transaction, allocate the hash + * table. + */ + if (WriteStateMap == NULL) + { + WriteStateContext = + AllocSetContextCreate( + TopTransactionContext, + "Column Store Write State Management Context", + ALLOCSET_DEFAULT_SIZES); + HASHCTL info; + uint32 hashFlags = (HASH_ELEM | HASH_CONTEXT); + memset(&info, 0, sizeof(info)); + info.keysize = sizeof(Oid); + info.entrysize = sizeof(WriteStateMapEntry); + info.hcxt = WriteStateContext; + + WriteStateMap = hash_create("column store write state map", + 64, &info, hashFlags); + + cleanupCallback.arg = NULL; + cleanupCallback.func = &CleanupWriteStateMap; + cleanupCallback.next = NULL; + MemoryContextRegisterResetCallback(WriteStateContext, &cleanupCallback); + } + + WriteStateMapEntry *hashEntry = hash_search(WriteStateMap, &relfilenode.relNode, + HASH_ENTER, &found); + if (!found) + { + hashEntry->writeStateStack = NULL; + hashEntry->dropped = false; + } + + Assert(!hashEntry->dropped); + + /* + * If top of stack belongs to the current subtransaction, return its + * writeState, ... + */ + if (hashEntry->writeStateStack != NULL) + { + SubXidWriteState *stackHead = hashEntry->writeStateStack; + + if (stackHead->subXid == currentSubXid) + { + return stackHead->writeState; + } + } + + /* + * ... otherwise we need to create a new stack entry for the current + * subtransaction. + */ + MemoryContext oldContext = MemoryContextSwitchTo(WriteStateContext); + + CStoreOptions *cstoreOptions = CStoreTableAMGetOptions(relfilenode.relNode); + SubXidWriteState *stackEntry = palloc0(sizeof(SubXidWriteState)); + stackEntry->writeState = CStoreBeginWrite(relfilenode, + cstoreOptions->compressionType, + cstoreOptions->stripeRowCount, + cstoreOptions->blockRowCount, + tupdesc); + stackEntry->subXid = currentSubXid; + stackEntry->next = hashEntry->writeStateStack; + hashEntry->writeStateStack = stackEntry; + + MemoryContextSwitchTo(oldContext); + + return stackEntry->writeState; +} + + +/* + * Flushes pending writes for given relfilenode in the given subtransaction. + */ +void +FlushWriteStateForRelfilenode(Oid relfilenode, SubTransactionId currentSubXid) +{ + WriteStateMapEntry *entry; + bool found = false; + + if (WriteStateMap) + { + entry = hash_search(WriteStateMap, &relfilenode, HASH_FIND, &found); + } + + Assert(!found || !entry->dropped); + + if (found && entry->writeStateStack != NULL) + { + SubXidWriteState *stackEntry = entry->writeStateStack; + if (stackEntry->subXid == currentSubXid) + { + CStoreFlushPendingWrites(stackEntry->writeState); + } + } +} + + +/* + * Helper function for FlushWriteStateForAllRels and DiscardWriteStateForAllRels. + * Pops all of write states for current subtransaction, and depending on "commit" + * either flushes them or discards them. This also takes into account dropped + * tables, and either propagates the dropped flag to parent subtransaction or + * rolls back abort. + */ +static void +PopWriteStateForAllRels(SubTransactionId currentSubXid, SubTransactionId parentSubXid, + bool commit) +{ + HASH_SEQ_STATUS status; + WriteStateMapEntry *entry; + + if (WriteStateMap == NULL) + { + return; + } + + hash_seq_init(&status, WriteStateMap); + while ((entry = hash_seq_search(&status)) != 0) + { + if (entry->writeStateStack == NULL) + { + continue; + } + + /* + * If the table has been dropped in current subtransaction, either + * commit the drop or roll it back. + */ + if (entry->dropped) + { + if (entry->dropSubXid == currentSubXid) + { + if (commit) + { + /* elevate drop to the upper subtransaction */ + entry->dropSubXid = parentSubXid; + } + else + { + /* abort the drop */ + entry->dropped = false; + } + } + } + + /* + * Otherwise, commit or discard pending writes. + */ + else + { + SubXidWriteState *stackHead = entry->writeStateStack; + if (stackHead->subXid == currentSubXid) + { + if (commit) + { + CStoreEndWrite(stackHead->writeState); + } + + entry->writeStateStack = stackHead->next; + } + } + } +} + + +/* + * Called when current subtransaction is committed. + */ +void +FlushWriteStateForAllRels(SubTransactionId currentSubXid, SubTransactionId parentSubXid) +{ + PopWriteStateForAllRels(currentSubXid, parentSubXid, true); +} + + +/* + * Called when current subtransaction is aborted. + */ +void +DiscardWriteStateForAllRels(SubTransactionId currentSubXid, SubTransactionId parentSubXid) +{ + PopWriteStateForAllRels(currentSubXid, parentSubXid, false); +} + + +/* + * Called when the given relfilenode is dropped. + */ +void +MarkRelfilenodeDropped(Oid relfilenode, SubTransactionId currentSubXid) +{ + bool found = false; + + if (WriteStateMap == NULL) + { + return; + } + + WriteStateMapEntry *entry = hash_search(WriteStateMap, &relfilenode, HASH_FIND, + &found); + if (!found || entry->dropped) + { + return; + } + + entry->dropped = true; + entry->dropSubXid = currentSubXid; +} + + +/* + * Called when the given relfilenode is dropped in non-transactional TRUNCATE. + */ +void +NonTransactionDropWriteState(Oid relfilenode) +{ + if (WriteStateMap) + { + hash_search(WriteStateMap, &relfilenode, HASH_REMOVE, false); + } +} + + +/* + * Returns true if there are any pending writes in upper transactions. + */ +bool +PendingWritesInUpperTransactions(Oid relfilenode, SubTransactionId currentSubXid) +{ + WriteStateMapEntry *entry; + bool found = false; + + if (WriteStateMap) + { + entry = hash_search(WriteStateMap, &relfilenode, HASH_FIND, &found); + } + + if (found && entry->writeStateStack != NULL) + { + SubXidWriteState *stackEntry = entry->writeStateStack; + + while (stackEntry != NULL) + { + if (stackEntry->subXid != currentSubXid && + ContainsPendingWrites(stackEntry->writeState)) + { + return true; + } + + stackEntry = stackEntry->next; + } + } + + return false; +} + + +#endif diff --git a/src/include/columnar/cstore.h b/src/include/columnar/cstore.h index ff0051b95..edfe65efe 100644 --- a/src/include/columnar/cstore.h +++ b/src/include/columnar/cstore.h @@ -20,6 +20,7 @@ #include "nodes/parsenodes.h" #include "storage/bufpage.h" #include "storage/lockdefs.h" +#include "storage/relfilenode.h" #include "utils/relcache.h" #include "utils/snapmgr.h" @@ -224,7 +225,7 @@ typedef struct TableWriteState CompressionType compressionType; TupleDesc tupleDescriptor; FmgrInfo **comparisonFunctionArray; - Relation relation; + RelFileNode relfilenode; MemoryContext stripeWriteContext; StripeBuffers *stripeBuffers; @@ -251,14 +252,16 @@ extern void cstore_init(void); extern CompressionType ParseCompressionType(const char *compressionTypeString); /* Function declarations for writing to a cstore file */ -extern TableWriteState * CStoreBeginWrite(Relation relation, +extern TableWriteState * CStoreBeginWrite(RelFileNode relfilenode, CompressionType compressionType, uint64 stripeMaxRowCount, uint32 blockRowCount, TupleDesc tupleDescriptor); extern void CStoreWriteRow(TableWriteState *state, Datum *columnValues, bool *columnNulls); +extern void CStoreFlushPendingWrites(TableWriteState *state); extern void CStoreEndWrite(TableWriteState *state); +extern bool ContainsPendingWrites(TableWriteState *state); /* Function declarations for reading from a cstore file */ extern TableReadState * CStoreBeginRead(Relation relation, @@ -281,6 +284,7 @@ extern bool CompressBuffer(StringInfo inputBuffer, StringInfo outputBuffer, CompressionType compressionType); extern StringInfo DecompressBuffer(StringInfo buffer, CompressionType compressionType); extern char * CompressionTypeStr(CompressionType type); +extern CStoreOptions * CStoreTableAMGetOptions(Oid relfilenode); /* cstore_metadata_tables.c */ extern void DeleteDataFileMetadataRowIfExists(Oid relfilenode); @@ -300,6 +304,22 @@ extern StripeSkipList * ReadStripeSkipList(Oid relfilenode, uint64 stripe, TupleDesc tupleDescriptor, uint32 blockCount); + +/* write_state_management.c */ +extern TableWriteState * cstore_init_write_state(RelFileNode relfilenode, TupleDesc + tupdesc, + SubTransactionId currentSubXid); +extern void FlushWriteStateForRelfilenode(Oid relfilenode, SubTransactionId + currentSubXid); +extern void FlushWriteStateForAllRels(SubTransactionId currentSubXid, SubTransactionId + parentSubXid); +extern void DiscardWriteStateForAllRels(SubTransactionId currentSubXid, SubTransactionId + parentSubXid); +extern void MarkRelfilenodeDropped(Oid relfilenode, SubTransactionId currentSubXid); +extern void NonTransactionDropWriteState(Oid relfilenode); +extern bool PendingWritesInUpperTransactions(Oid relfilenode, + SubTransactionId currentSubXid); + typedef struct SmgrAddr { BlockNumber blockno; diff --git a/src/test/regress/columnar_am_schedule b/src/test/regress/columnar_am_schedule index 70ae97db3..3a4a46270 100644 --- a/src/test/regress/columnar_am_schedule +++ b/src/test/regress/columnar_am_schedule @@ -16,3 +16,5 @@ test: am_block_filtering test: am_join test: am_trigger test: am_tableoptions +test: am_recursive +test: am_transactions diff --git a/src/test/regress/expected/am_recursive.out b/src/test/regress/expected/am_recursive.out new file mode 100644 index 000000000..eb90dda47 --- /dev/null +++ b/src/test/regress/expected/am_recursive.out @@ -0,0 +1,252 @@ +CREATE TABLE t1(a int, b int) USING cstore_tableam; +CREATE TABLE t2(a int, b int) USING cstore_tableam; +CREATE FUNCTION f(x INT) RETURNS INT AS $$ + INSERT INTO t1 VALUES(x, x * 2) RETURNING b - 1; +$$ LANGUAGE SQL; +-- +-- Following query will start a write to t1 before finishing +-- write to t1, so it tests that we handle recursive writes +-- correctly. +-- +INSERT INTO t2 SELECT i, f(i) FROM generate_series(1, 5) i; +-- there are no subtransactions, so above statement should batch +-- INSERTs inside the UDF and create on stripe per table. +SELECT relname, count(*) FROM cstore.cstore_stripes a, pg_class b +WHERE a.relfilenode=b.relfilenode AND relname IN ('t1', 't2') +GROUP BY relname +ORDER BY relname; + relname | count +--------------------------------------------------------------------- + t1 | 1 + t2 | 1 +(2 rows) + +SELECT * FROM t1 ORDER BY a; + a | b +--------------------------------------------------------------------- + 1 | 2 + 2 | 4 + 3 | 6 + 4 | 8 + 5 | 10 +(5 rows) + +SELECT * FROM t2 ORDER BY a; + a | b +--------------------------------------------------------------------- + 1 | 1 + 2 | 3 + 3 | 5 + 4 | 7 + 5 | 9 +(5 rows) + +TRUNCATE t1; +TRUNCATE t2; +DROP FUNCTION f(INT); +-- +-- Test the case when 2 writes are going on concurrently in the +-- same executor, and those 2 writes are dependent. +-- +WITH t AS ( + INSERT INTO t1 SELECT i, 2*i FROM generate_series(1, 5) i RETURNING * +) +INSERT INTO t2 SELECT t.a, t.a+1 FROM t; +SELECT * FROM t1; + a | b +--------------------------------------------------------------------- + 1 | 2 + 2 | 4 + 3 | 6 + 4 | 8 + 5 | 10 +(5 rows) + +SELECT * FROM t2; + a | b +--------------------------------------------------------------------- + 1 | 2 + 2 | 3 + 3 | 4 + 4 | 5 + 5 | 6 +(5 rows) + +TRUNCATE t1; +TRUNCATE t2; +-- +-- Test the case when there are 2 independent inserts in a CTE. +-- Also tests the case where some of the tuple_inserts happen in +-- ExecutorFinish() instead of ExecutorRun(). +-- +WITH t AS ( + INSERT INTO t1 SELECT i, 2*i FROM generate_series(1, 5) i RETURNING * +) +INSERT INTO t2 SELECT i, (select count(*) from t1) FROM generate_series(1, 3) i; +SELECT * FROM t1; + a | b +--------------------------------------------------------------------- + 1 | 2 + 2 | 4 + 3 | 6 + 4 | 8 + 5 | 10 +(5 rows) + +SELECT * FROM t2; + a | b +--------------------------------------------------------------------- + 1 | 0 + 2 | 0 + 3 | 0 +(3 rows) + +TRUNCATE t1; +TRUNCATE t2; +-- +-- double insert on the same relation +-- +WITH t AS ( + INSERT INTO t1 SELECT i, 2*i FROM generate_series(1, 5) i RETURNING * +) +INSERT INTO t1 SELECT t.a, t.a+1 FROM t; +SELECT * FROM t1 ORDER BY a, b; + a | b +--------------------------------------------------------------------- + 1 | 2 + 1 | 2 + 2 | 3 + 2 | 4 + 3 | 4 + 3 | 6 + 4 | 5 + 4 | 8 + 5 | 6 + 5 | 10 +(10 rows) + +TRUNCATE t1; +TRUNCATE t2; +-- +-- A test where result of a UDF call will depend on execution +-- of previous UDF calls. +-- +CREATE FUNCTION g(x INT) RETURNS INT AS $$ + INSERT INTO t1 VALUES(x, x * 2); + SELECT count(*)::int FROM t1; +$$ LANGUAGE SQL; +-- t3 and t4 are heap tables to help with cross-checking results +CREATE TABLE t3(a int, b int); +CREATE TABLE t4(a int, b int); +CREATE FUNCTION g2(x INT) RETURNS INT AS $$ + INSERT INTO t3 VALUES(x, x * 2); + SELECT count(*)::int FROM t3; +$$ LANGUAGE SQL; +INSERT INTO t2 SELECT i, g(i) FROM generate_series(1, 5) i; +INSERT INTO t4 SELECT i, g2(i) FROM generate_series(1, 5) i; +-- check that t1==t3 and t2==t4. +((table t1) except (table t3)) union ((table t3) except (table t1)); + a | b +--------------------------------------------------------------------- +(0 rows) + +((table t2) except (table t4)) union ((table t4) except (table t2)); + a | b +--------------------------------------------------------------------- +(0 rows) + +SELECT * FROM t2 ORDER BY a, b; + a | b +--------------------------------------------------------------------- + 1 | 1 + 2 | 2 + 3 | 3 + 4 | 4 + 5 | 5 +(5 rows) + +TRUNCATE t1, t2, t3, t4; +-- +-- INSERT into the same relation that was INSERTed into in the UDF +-- +INSERT INTO t1 SELECT i, g(i) FROM generate_series(1, 3) i; +INSERT INTO t3 SELECT i, g2(i) FROM generate_series(1, 3) i; +SELECT * FROM t1 ORDER BY a, b; + a | b +--------------------------------------------------------------------- + 1 | 1 + 1 | 2 + 2 | 3 + 2 | 4 + 3 | 5 + 3 | 6 +(6 rows) + +SELECT * FROM t3 ORDER BY a, b; + a | b +--------------------------------------------------------------------- + 1 | 1 + 1 | 2 + 2 | 3 + 2 | 4 + 3 | 5 + 3 | 6 +(6 rows) + +-- check that t1==t3 and t2==t4. +((table t1) except (table t3)) union ((table t3) except (table t1)); + a | b +--------------------------------------------------------------------- +(0 rows) + +((table t2) except (table t4)) union ((table t4) except (table t2)); + a | b +--------------------------------------------------------------------- +(0 rows) + +DROP FUNCTION g(int), g2(int); +TRUNCATE t1, t2, t3, t4; +-- +-- EXCEPTION in plpgsql, which is implemented internally using +-- subtransactions. plgpsql uses SPI to execute INSERT statements. +-- +CREATE FUNCTION f(a int) RETURNS VOID AS $$ +DECLARE + x int; +BEGIN + INSERT INTO t1 SELECT i, i + 1 FROM generate_series(a, a + 1) i; + x := 10 / a; + INSERT INTO t1 SELECT i, i * 2 FROM generate_series(a + 2, a + 3) i; +EXCEPTION WHEN division_by_zero THEN + INSERT INTO t1 SELECT i, i + 1 FROM generate_series(a + 2, a + 3) i; +END; +$$ LANGUAGE plpgsql; +SELECT f(10); + f +--------------------------------------------------------------------- + +(1 row) + +SELECT f(0), f(20); + f | f +--------------------------------------------------------------------- + | +(1 row) + +SELECT * FROM t1 ORDER BY a, b; + a | b +--------------------------------------------------------------------- + 2 | 3 + 3 | 4 + 10 | 11 + 11 | 12 + 12 | 24 + 13 | 26 + 20 | 21 + 21 | 22 + 22 | 44 + 23 | 46 +(10 rows) + +DROP FUNCTION f(int); +DROP TABLE t1, t2, t3, t4; diff --git a/src/test/regress/expected/am_rollback.out b/src/test/regress/expected/am_rollback.out index e58ef5135..a4c069377 100644 --- a/src/test/regress/expected/am_rollback.out +++ b/src/test/regress/expected/am_rollback.out @@ -37,6 +37,12 @@ WHERE a.relfilenode = b.relfilenode AND b.relname = 't'; BEGIN; SAVEPOINT s0; INSERT INTO t SELECT i, i+1 FROM generate_series(1, 10) i; +SELECT count(*) FROM t; -- force flush + count +--------------------------------------------------------------------- + 20 +(1 row) + SAVEPOINT s1; INSERT INTO t SELECT i, i+1 FROM generate_series(1, 10) i; SELECT count(*) FROM t; diff --git a/src/test/regress/expected/am_transactions.out b/src/test/regress/expected/am_transactions.out new file mode 100644 index 000000000..1410272dd --- /dev/null +++ b/src/test/regress/expected/am_transactions.out @@ -0,0 +1,142 @@ +-- +-- Testing we handle transactions properly +-- +CREATE TABLE t(a int, b int) USING cstore_tableam; +INSERT INTO t SELECT i, 2 * i FROM generate_series(1, 3) i; +SELECT * FROM t ORDER BY a; + a | b +--------------------------------------------------------------------- + 1 | 2 + 2 | 4 + 3 | 6 +(3 rows) + +-- verify that table rewrites work properly +BEGIN; +ALTER TABLE t ALTER COLUMN b TYPE float4 USING (b + 0.5)::float4; +INSERT INTO t VALUES (4, 8.5); +SELECT * FROM t ORDER BY a; + a | b +--------------------------------------------------------------------- + 1 | 2.5 + 2 | 4.5 + 3 | 6.5 + 4 | 8.5 +(4 rows) + +ROLLBACK; +SELECT * FROM t ORDER BY a; + a | b +--------------------------------------------------------------------- + 1 | 2 + 2 | 4 + 3 | 6 +(3 rows) + +-- verify truncate rollback +BEGIN; +TRUNCATE t; +INSERT INTO t VALUES (4, 8); +SELECT * FROM t ORDER BY a; + a | b +--------------------------------------------------------------------- + 4 | 8 +(1 row) + +SAVEPOINT s1; +TRUNCATE t; +SELECT * FROM t ORDER BY a; + a | b +--------------------------------------------------------------------- +(0 rows) + +ROLLBACK TO SAVEPOINT s1; +SELECT * FROM t ORDER BY a; + a | b +--------------------------------------------------------------------- + 4 | 8 +(1 row) + +ROLLBACK; +-- verify truncate with unflushed data in upper xacts +BEGIN; +INSERT INTO t VALUES (4, 8); +SAVEPOINT s1; +TRUNCATE t; +ROLLBACK TO SAVEPOINT s1; +COMMIT; +SELECT * FROM t ORDER BY a; + a | b +--------------------------------------------------------------------- + 1 | 2 + 2 | 4 + 3 | 6 + 4 | 8 +(4 rows) + +-- verify DROP TABLE rollback +BEGIN; +INSERT INTO t VALUES (5, 10); +SELECT * FROM t ORDER BY a; + a | b +--------------------------------------------------------------------- + 1 | 2 + 2 | 4 + 3 | 6 + 4 | 8 + 5 | 10 +(5 rows) + +SAVEPOINT s1; +DROP TABLE t; +SELECT * FROM t ORDER BY a; +ERROR: relation "t" does not exist +ROLLBACK TO SAVEPOINT s1; +SELECT * FROM t ORDER BY a; + a | b +--------------------------------------------------------------------- + 1 | 2 + 2 | 4 + 3 | 6 + 4 | 8 + 5 | 10 +(5 rows) + +ROLLBACK; +-- verify DROP TABLE with unflushed data in upper xacts +BEGIN; +INSERT INTO t VALUES (5, 10); +SAVEPOINT s1; +DROP TABLE t; +SELECT * FROM t ORDER BY a; +ERROR: relation "t" does not exist +ROLLBACK TO SAVEPOINT s1; +COMMIT; +SELECT * FROM t ORDER BY a; + a | b +--------------------------------------------------------------------- + 1 | 2 + 2 | 4 + 3 | 6 + 4 | 8 + 5 | 10 +(5 rows) + +-- verify SELECT when unflushed data in upper transactions errors. +BEGIN; +INSERT INTO t VALUES (6, 12); +SAVEPOINT s1; +SELECT * FROM t; +ERROR: cannot read from table when there is unflushed data in upper transactions +ROLLBACK; +SELECT * FROM t ORDER BY a; + a | b +--------------------------------------------------------------------- + 1 | 2 + 2 | 4 + 3 | 6 + 4 | 8 + 5 | 10 +(5 rows) + +DROP TABLE t; diff --git a/src/test/regress/expected/am_trigger.out b/src/test/regress/expected/am_trigger.out index f289b7dad..cd85c96bc 100644 --- a/src/test/regress/expected/am_trigger.out +++ b/src/test/regress/expected/am_trigger.out @@ -74,4 +74,98 @@ NOTICE: (3) CONTEXT: PL/pgSQL function trs_after() line 14 at RAISE NOTICE: (4) CONTEXT: PL/pgSQL function trs_after() line 14 at RAISE +SELECT * FROM test_tr ORDER BY i; + i +--------------------------------------------------------------------- + 1 + 2 + 3 + 4 +(4 rows) + drop table test_tr; +create table test_tr(i int) using cstore_tableam; +-- we should be able to clean-up and continue gracefully if we +-- error out in AFTER STATEMENT triggers. +CREATE SEQUENCE counter START 100; +create or replace function trs_after_erroring() returns trigger language plpgsql as $$ +BEGIN + IF nextval('counter') % 2 = 0 THEN + RAISE EXCEPTION '%', 'error'; + END IF; + RETURN NULL; +END; +$$; +create trigger tr_after_stmt_erroring after insert on test_tr + referencing new table as new_table + for each statement execute procedure trs_after_erroring(); +-- +-- Once upon a time we didn't clean-up properly after erroring out. Here the first +-- statement errors, but the second succeeds. In old times, because of failure in +-- clean-up, both rows were visible. But only the 2nd one should be visible. +-- +insert into test_tr values(5); +ERROR: error +CONTEXT: PL/pgSQL function trs_after_erroring() line 4 at RAISE +insert into test_tr values(6); +SELECT * FROM test_tr ORDER BY i; + i +--------------------------------------------------------------------- + 6 +(1 row) + +drop table test_tr; +-- +-- https://github.com/citusdata/cstore2/issues/32 +-- +create table events( + user_id bigint, + event_id bigint, + event_time timestamp default now(), + value float default random()) + PARTITION BY RANGE (event_time); +create table events_p2020_11_04_102965 +PARTITION OF events FOR VALUES FROM ('2020-11-04 00:00:00+01') TO ('2020-11-05 00:00:00+01') +USING cstore_tableam; +create table events_trigger_target( + user_id bigint, + avg float, + __count__ bigint +) USING cstore_tableam; +CREATE OR REPLACE FUNCTION user_value_by_day() + RETURNS trigger + LANGUAGE plpgsql +AS $function$ +BEGIN + IF (TG_OP = 'INSERT' OR TG_OP = 'UPDATE') THEN + EXECUTE format($exec_format$INSERT INTO %s AS __mat__ SELECT user_id, 0.1 AS avg, pg_catalog.count(*) AS __count__ FROM __ins__ events GROUP BY user_id; + $exec_format$, TG_ARGV[0]); + END IF; + IF (TG_OP = 'DELETE' OR TG_OP = 'UPDATE') THEN + RAISE EXCEPTION $ex$MATERIALIZED VIEW 'user_value_by_day' on table 'events' does not support UPDATE/DELETE$ex$; + END IF; + IF (TG_OP = 'TRUNCATE') THEN + EXECUTE format($exec_format$TRUNCATE TABLE %s; $exec_format$, TG_ARGV[0]); + END IF; + RETURN NULL; +END; +$function$; +create trigger "user_value_by_day_INSERT" AFTER INSERT ON events + REFERENCING NEW TABLE AS __ins__ + FOR EACH STATEMENT EXECUTE FUNCTION user_value_by_day('events_trigger_target'); +COPY events FROM STDIN WITH (FORMAT 'csv'); +SELECT * FROM events ORDER BY user_id; + user_id | event_id | event_time | value +--------------------------------------------------------------------- + 1 | 1 | Wed Nov 04 15:54:02.226999 2020 | 1.1 + 2 | 3 | Wed Nov 04 16:54:02.226999 2020 | 2.2 +(2 rows) + +SELECT * FROM events_trigger_target ORDER BY user_id; + user_id | avg | __count__ +--------------------------------------------------------------------- + 1 | 0.1 | 1 + 2 | 0.1 | 1 +(2 rows) + +DROP TABLE events; diff --git a/src/test/regress/expected/am_vacuum_vs_insert.out b/src/test/regress/expected/am_vacuum_vs_insert.out index f5ef08673..a4539a073 100644 --- a/src/test/regress/expected/am_vacuum_vs_insert.out +++ b/src/test/regress/expected/am_vacuum_vs_insert.out @@ -11,7 +11,7 @@ step s1-insert: INSERT INTO test_vacuum_vs_insert SELECT i, 2 * i FROM generate_series(1, 3) i; s2: INFO: statistics for "test_vacuum_vs_insert": -total file size: 24576, total data size: 26 +total file size: 16384, total data size: 26 total row count: 3, stripe count: 1, average rows per stripe: 3 block count: 2, containing data for dropped columns: 0, none compressed: 2, pglz compressed: 0 diff --git a/src/test/regress/sql/am_recursive.sql b/src/test/regress/sql/am_recursive.sql new file mode 100644 index 000000000..1e3d6dc06 --- /dev/null +++ b/src/test/regress/sql/am_recursive.sql @@ -0,0 +1,143 @@ + +CREATE TABLE t1(a int, b int) USING cstore_tableam; +CREATE TABLE t2(a int, b int) USING cstore_tableam; + +CREATE FUNCTION f(x INT) RETURNS INT AS $$ + INSERT INTO t1 VALUES(x, x * 2) RETURNING b - 1; +$$ LANGUAGE SQL; + +-- +-- Following query will start a write to t1 before finishing +-- write to t1, so it tests that we handle recursive writes +-- correctly. +-- +INSERT INTO t2 SELECT i, f(i) FROM generate_series(1, 5) i; + +-- there are no subtransactions, so above statement should batch +-- INSERTs inside the UDF and create on stripe per table. +SELECT relname, count(*) FROM cstore.cstore_stripes a, pg_class b +WHERE a.relfilenode=b.relfilenode AND relname IN ('t1', 't2') +GROUP BY relname +ORDER BY relname; + +SELECT * FROM t1 ORDER BY a; +SELECT * FROM t2 ORDER BY a; + +TRUNCATE t1; +TRUNCATE t2; +DROP FUNCTION f(INT); + +-- +-- Test the case when 2 writes are going on concurrently in the +-- same executor, and those 2 writes are dependent. +-- +WITH t AS ( + INSERT INTO t1 SELECT i, 2*i FROM generate_series(1, 5) i RETURNING * +) +INSERT INTO t2 SELECT t.a, t.a+1 FROM t; + +SELECT * FROM t1; +SELECT * FROM t2; + +TRUNCATE t1; +TRUNCATE t2; + +-- +-- Test the case when there are 2 independent inserts in a CTE. +-- Also tests the case where some of the tuple_inserts happen in +-- ExecutorFinish() instead of ExecutorRun(). +-- +WITH t AS ( + INSERT INTO t1 SELECT i, 2*i FROM generate_series(1, 5) i RETURNING * +) +INSERT INTO t2 SELECT i, (select count(*) from t1) FROM generate_series(1, 3) i; + +SELECT * FROM t1; +SELECT * FROM t2; + +TRUNCATE t1; +TRUNCATE t2; + +-- +-- double insert on the same relation +-- +WITH t AS ( + INSERT INTO t1 SELECT i, 2*i FROM generate_series(1, 5) i RETURNING * +) +INSERT INTO t1 SELECT t.a, t.a+1 FROM t; + +SELECT * FROM t1 ORDER BY a, b; + +TRUNCATE t1; +TRUNCATE t2; + +-- +-- A test where result of a UDF call will depend on execution +-- of previous UDF calls. +-- + +CREATE FUNCTION g(x INT) RETURNS INT AS $$ + INSERT INTO t1 VALUES(x, x * 2); + SELECT count(*)::int FROM t1; +$$ LANGUAGE SQL; + +-- t3 and t4 are heap tables to help with cross-checking results +CREATE TABLE t3(a int, b int); +CREATE TABLE t4(a int, b int); + +CREATE FUNCTION g2(x INT) RETURNS INT AS $$ + INSERT INTO t3 VALUES(x, x * 2); + SELECT count(*)::int FROM t3; +$$ LANGUAGE SQL; + +INSERT INTO t2 SELECT i, g(i) FROM generate_series(1, 5) i; +INSERT INTO t4 SELECT i, g2(i) FROM generate_series(1, 5) i; + +-- check that t1==t3 and t2==t4. +((table t1) except (table t3)) union ((table t3) except (table t1)); +((table t2) except (table t4)) union ((table t4) except (table t2)); + +SELECT * FROM t2 ORDER BY a, b; + +TRUNCATE t1, t2, t3, t4; + +-- +-- INSERT into the same relation that was INSERTed into in the UDF +-- +INSERT INTO t1 SELECT i, g(i) FROM generate_series(1, 3) i; +INSERT INTO t3 SELECT i, g2(i) FROM generate_series(1, 3) i; +SELECT * FROM t1 ORDER BY a, b; +SELECT * FROM t3 ORDER BY a, b; + +-- check that t1==t3 and t2==t4. +((table t1) except (table t3)) union ((table t3) except (table t1)); +((table t2) except (table t4)) union ((table t4) except (table t2)); + +DROP FUNCTION g(int), g2(int); +TRUNCATE t1, t2, t3, t4; + +-- +-- EXCEPTION in plpgsql, which is implemented internally using +-- subtransactions. plgpsql uses SPI to execute INSERT statements. +-- + +CREATE FUNCTION f(a int) RETURNS VOID AS $$ +DECLARE + x int; +BEGIN + INSERT INTO t1 SELECT i, i + 1 FROM generate_series(a, a + 1) i; + x := 10 / a; + INSERT INTO t1 SELECT i, i * 2 FROM generate_series(a + 2, a + 3) i; +EXCEPTION WHEN division_by_zero THEN + INSERT INTO t1 SELECT i, i + 1 FROM generate_series(a + 2, a + 3) i; +END; +$$ LANGUAGE plpgsql; + +SELECT f(10); +SELECT f(0), f(20); + +SELECT * FROM t1 ORDER BY a, b; + +DROP FUNCTION f(int); +DROP TABLE t1, t2, t3, t4; + diff --git a/src/test/regress/sql/am_rollback.sql b/src/test/regress/sql/am_rollback.sql index da1cc8ce4..bda02152e 100644 --- a/src/test/regress/sql/am_rollback.sql +++ b/src/test/regress/sql/am_rollback.sql @@ -23,6 +23,7 @@ WHERE a.relfilenode = b.relfilenode AND b.relname = 't'; BEGIN; SAVEPOINT s0; INSERT INTO t SELECT i, i+1 FROM generate_series(1, 10) i; +SELECT count(*) FROM t; -- force flush SAVEPOINT s1; INSERT INTO t SELECT i, i+1 FROM generate_series(1, 10) i; SELECT count(*) FROM t; diff --git a/src/test/regress/sql/am_transactions.sql b/src/test/regress/sql/am_transactions.sql new file mode 100644 index 000000000..b84790fb5 --- /dev/null +++ b/src/test/regress/sql/am_transactions.sql @@ -0,0 +1,70 @@ +-- +-- Testing we handle transactions properly +-- + +CREATE TABLE t(a int, b int) USING cstore_tableam; + +INSERT INTO t SELECT i, 2 * i FROM generate_series(1, 3) i; +SELECT * FROM t ORDER BY a; + +-- verify that table rewrites work properly +BEGIN; +ALTER TABLE t ALTER COLUMN b TYPE float4 USING (b + 0.5)::float4; +INSERT INTO t VALUES (4, 8.5); +SELECT * FROM t ORDER BY a; +ROLLBACK; + +SELECT * FROM t ORDER BY a; + +-- verify truncate rollback +BEGIN; +TRUNCATE t; +INSERT INTO t VALUES (4, 8); +SELECT * FROM t ORDER BY a; +SAVEPOINT s1; +TRUNCATE t; +SELECT * FROM t ORDER BY a; +ROLLBACK TO SAVEPOINT s1; +SELECT * FROM t ORDER BY a; +ROLLBACK; + +-- verify truncate with unflushed data in upper xacts +BEGIN; +INSERT INTO t VALUES (4, 8); +SAVEPOINT s1; +TRUNCATE t; +ROLLBACK TO SAVEPOINT s1; +COMMIT; + +SELECT * FROM t ORDER BY a; + +-- verify DROP TABLE rollback +BEGIN; +INSERT INTO t VALUES (5, 10); +SELECT * FROM t ORDER BY a; +SAVEPOINT s1; +DROP TABLE t; +SELECT * FROM t ORDER BY a; +ROLLBACK TO SAVEPOINT s1; +SELECT * FROM t ORDER BY a; +ROLLBACK; + +-- verify DROP TABLE with unflushed data in upper xacts +BEGIN; +INSERT INTO t VALUES (5, 10); +SAVEPOINT s1; +DROP TABLE t; +SELECT * FROM t ORDER BY a; +ROLLBACK TO SAVEPOINT s1; +COMMIT; +SELECT * FROM t ORDER BY a; + +-- verify SELECT when unflushed data in upper transactions errors. +BEGIN; +INSERT INTO t VALUES (6, 12); +SAVEPOINT s1; +SELECT * FROM t; +ROLLBACK; +SELECT * FROM t ORDER BY a; + +DROP TABLE t; diff --git a/src/test/regress/sql/am_trigger.sql b/src/test/regress/sql/am_trigger.sql index b8a918cf4..3cdd53b44 100644 --- a/src/test/regress/sql/am_trigger.sql +++ b/src/test/regress/sql/am_trigger.sql @@ -58,4 +58,87 @@ create trigger tr_after_row after insert on test_tr insert into test_tr values(1); insert into test_tr values(2),(3),(4); +SELECT * FROM test_tr ORDER BY i; + drop table test_tr; +create table test_tr(i int) using cstore_tableam; + +-- we should be able to clean-up and continue gracefully if we +-- error out in AFTER STATEMENT triggers. +CREATE SEQUENCE counter START 100; +create or replace function trs_after_erroring() returns trigger language plpgsql as $$ +BEGIN + IF nextval('counter') % 2 = 0 THEN + RAISE EXCEPTION '%', 'error'; + END IF; + RETURN NULL; +END; +$$; + +create trigger tr_after_stmt_erroring after insert on test_tr + referencing new table as new_table + for each statement execute procedure trs_after_erroring(); + +-- +-- Once upon a time we didn't clean-up properly after erroring out. Here the first +-- statement errors, but the second succeeds. In old times, because of failure in +-- clean-up, both rows were visible. But only the 2nd one should be visible. +-- +insert into test_tr values(5); +insert into test_tr values(6); +SELECT * FROM test_tr ORDER BY i; + +drop table test_tr; + +-- +-- https://github.com/citusdata/cstore2/issues/32 +-- +create table events( + user_id bigint, + event_id bigint, + event_time timestamp default now(), + value float default random()) + PARTITION BY RANGE (event_time); + +create table events_p2020_11_04_102965 +PARTITION OF events FOR VALUES FROM ('2020-11-04 00:00:00+01') TO ('2020-11-05 00:00:00+01') +USING cstore_tableam; + +create table events_trigger_target( + user_id bigint, + avg float, + __count__ bigint +) USING cstore_tableam; + +CREATE OR REPLACE FUNCTION user_value_by_day() + RETURNS trigger + LANGUAGE plpgsql +AS $function$ +BEGIN + IF (TG_OP = 'INSERT' OR TG_OP = 'UPDATE') THEN + EXECUTE format($exec_format$INSERT INTO %s AS __mat__ SELECT user_id, 0.1 AS avg, pg_catalog.count(*) AS __count__ FROM __ins__ events GROUP BY user_id; + $exec_format$, TG_ARGV[0]); + END IF; + IF (TG_OP = 'DELETE' OR TG_OP = 'UPDATE') THEN + RAISE EXCEPTION $ex$MATERIALIZED VIEW 'user_value_by_day' on table 'events' does not support UPDATE/DELETE$ex$; + END IF; + IF (TG_OP = 'TRUNCATE') THEN + EXECUTE format($exec_format$TRUNCATE TABLE %s; $exec_format$, TG_ARGV[0]); + END IF; + RETURN NULL; +END; +$function$; + +create trigger "user_value_by_day_INSERT" AFTER INSERT ON events + REFERENCING NEW TABLE AS __ins__ + FOR EACH STATEMENT EXECUTE FUNCTION user_value_by_day('events_trigger_target'); + +COPY events FROM STDIN WITH (FORMAT 'csv'); +1,1,"2020-11-04 15:54:02.226999-08",1.1 +2,3,"2020-11-04 16:54:02.226999-08",2.2 +\. + +SELECT * FROM events ORDER BY user_id; +SELECT * FROM events_trigger_target ORDER BY user_id; + +DROP TABLE events;