Implements write state management for tuple inserts.

TableAM API doesn't allow us to pass around a state variable along all of the tuple inserts belonging to the same command. We require this in columnar store, since we batch them, and when we have enough rows we flush them as stripes.

To do that, we keep a (relfilenode) -> stack of (subxact id, TableWriteState) global mapping.

**Inserts**

Whenever we want to insert a tuple, we look up for the relation's relfilenode in this mapping. If top of the stack matches current subtransaction, we us the existing TableWriteState. Otherwise, we allocate a new TableWriteState and push it on top of stack.

**(Sub)Transaction Commit/Aborts**

When the subtransaction or transaction is committed, we flush and pop all entries matching current SubTransactionId.

When the subtransaction or transaction is committed, we pop all entries matching current SubTransactionId and discard them without flushing.

**Reads**

Since we might have unwritten rows which needs to be read by a table scan, we flush write states on SELECTs. Since flushing the write state of upper transactions in a subtransaction will cause metadata being written in wrong subtransaction, we ERROR out if any of the upper subtransactions have unflushed rows.

**Table Drops**

We record in which subtransaction the table was dropped. When committing a subtransaction in which table was dropped, we propagate the drop to upper transaction. When aborting a subtransaction in which table was dropped, we mark table as not deleted.
pull/4319/head
Hadi Moshayedi 2020-11-17 11:54:23 -08:00
parent 2e09116b30
commit 97cba2d5b6
16 changed files with 1354 additions and 140 deletions

View File

@ -549,7 +549,7 @@ CopyIntoCStoreTable(const CopyStmt *copyStatement, const char *queryString)
#endif
/* init state to write to the cstore file */
writeState = CStoreBeginWrite(relation,
writeState = CStoreBeginWrite(relation->rd_node,
cstoreOptions->compressionType,
cstoreOptions->stripeRowCount,
cstoreOptions->blockRowCount,
@ -1992,13 +1992,16 @@ CStoreBeginForeignInsert(ModifyTableState *modifyTableState, ResultRelInfo *rela
CStoreOptions *cstoreOptions = CStoreGetOptions(foreignTableOid);
TupleDesc tupleDescriptor = RelationGetDescr(relationInfo->ri_RelationDesc);
TableWriteState *writeState = CStoreBeginWrite(relation,
TableWriteState *writeState = CStoreBeginWrite(relation->rd_node,
cstoreOptions->compressionType,
cstoreOptions->stripeRowCount,
cstoreOptions->blockRowCount,
tupleDescriptor);
relationInfo->ri_FdwState = (void *) writeState;
/* keep the lock */
relation_close(relation, NoLock);
}
@ -2055,10 +2058,7 @@ CStoreEndForeignInsert(EState *executorState, ResultRelInfo *relationInfo)
/* writeState is NULL during Explain queries */
if (writeState != NULL)
{
Relation relation = writeState->relation;
CStoreEndWrite(writeState);
heap_close(relation, RowExclusiveLock);
}
}

View File

@ -134,18 +134,19 @@ InitCStoreDataFileMetadata(Oid relfilenode, int blockRowCount, int stripeRowCoun
{
NameData compressionName = { 0 };
namestrcpy(&compressionName, CompressionTypeStr(compression));
bool nulls[Natts_cstore_data_files] = { 0 };
Datum values[Natts_cstore_data_files] = {
ObjectIdGetDatum(relfilenode),
Int32GetDatum(blockRowCount),
Int32GetDatum(stripeRowCount),
NameGetDatum(&compressionName),
0, /* to be filled below */
Int32GetDatum(CSTORE_VERSION_MAJOR),
Int32GetDatum(CSTORE_VERSION_MINOR)
};
namestrcpy(&compressionName, CompressionTypeStr(compression));
values[Anum_cstore_data_files_compression - 1] = NameGetDatum(&compressionName);
DeleteDataFileMetadataRowIfExists(relfilenode);
Oid cstoreDataFilesOid = CStoreDataFilesRelationId();
@ -171,6 +172,7 @@ UpdateCStoreDataFileMetadata(Oid relfilenode, int blockRowCount, int stripeRowCo
Datum values[Natts_cstore_data_files] = { 0 };
bool isnull[Natts_cstore_data_files] = { 0 };
bool replace[Natts_cstore_data_files] = { 0 };
bool changed = false;
Relation cstoreDataFiles = heap_open(CStoreDataFilesRelationId(), RowExclusiveLock);
TupleDesc tupleDescriptor = RelationGetDescr(cstoreDataFiles);
@ -192,7 +194,6 @@ UpdateCStoreDataFileMetadata(Oid relfilenode, int blockRowCount, int stripeRowCo
Form_cstore_data_files metadata = (Form_cstore_data_files) GETSTRUCT(heapTuple);
bool changed = false;
if (metadata->block_row_count != blockRowCount)
{
values[Anum_cstore_data_files_block_row_count - 1] = Int32GetDatum(blockRowCount);

View File

@ -68,10 +68,7 @@ typedef struct CStoreScanDescData
typedef struct CStoreScanDescData *CStoreScanDesc;
static TableWriteState *CStoreWriteState = NULL;
static ExecutorEnd_hook_type PreviousExecutorEndHook = NULL;
static MemoryContext CStoreContext = NULL;
static object_access_hook_type prevObjectAccessHook = NULL;
static object_access_hook_type PrevObjectAccessHook = NULL;
static ProcessUtility_hook_type PreviousProcessUtilityHook = NULL;
/* forward declaration for static functions */
@ -122,13 +119,13 @@ CStoreTableAMDefaultOptions()
* CStoreTableAMGetOptions returns the options based on a relation. It is advised the
* relation is a cstore table am table, if not it will raise an error
*/
static CStoreOptions *
CStoreTableAMGetOptions(Relation rel)
CStoreOptions *
CStoreTableAMGetOptions(Oid relfilenode)
{
Assert(rel != NULL);
Assert(OidIsValid(relfilenode));
CStoreOptions *cstoreOptions = palloc0(sizeof(CStoreOptions));
DataFileMetadata *metadata = ReadDataFileMetadata(rel->rd_node.relNode, false);
DataFileMetadata *metadata = ReadDataFileMetadata(relfilenode, false);
cstoreOptions->compressionType = metadata->compression;
cstoreOptions->stripeRowCount = metadata->stripeRowCount;
cstoreOptions->blockRowCount = metadata->blockRowCount;
@ -136,66 +133,6 @@ CStoreTableAMGetOptions(Relation rel)
}
static MemoryContext
GetCStoreMemoryContext()
{
if (CStoreContext == NULL)
{
CStoreContext = AllocSetContextCreate(TopMemoryContext, "cstore context",
ALLOCSET_DEFAULT_SIZES);
}
return CStoreContext;
}
static void
ResetCStoreMemoryContext()
{
if (CStoreContext != NULL)
{
MemoryContextReset(CStoreContext);
}
}
static void
cstore_init_write_state(Relation relation)
{
if (CStoreWriteState != NULL)
{
/* TODO: consider whether it's possible for a new write to start */
/* before an old one is flushed */
Assert(CStoreWriteState->relation->rd_id == relation->rd_id);
}
if (CStoreWriteState == NULL)
{
CStoreOptions *cstoreOptions = CStoreTableAMGetOptions(relation);
TupleDesc tupdesc = RelationGetDescr(relation);
elog(LOG, "initializing write state for relation %d", relation->rd_id);
CStoreWriteState = CStoreBeginWrite(relation,
cstoreOptions->compressionType,
cstoreOptions->stripeRowCount,
cstoreOptions->blockRowCount,
tupdesc);
}
}
static void
cstore_free_write_state()
{
if (CStoreWriteState != NULL)
{
elog(LOG, "flushing write state for relation %d",
CStoreWriteState->relation->rd_id);
CStoreEndWrite(CStoreWriteState);
CStoreWriteState = NULL;
}
}
static List *
RelationColumnList(Relation rel)
{
@ -263,9 +200,10 @@ cstore_beginscan_extended(Relation relation, Snapshot snapshot,
uint32 flags, Bitmapset *attr_needed, List *scanQual)
{
TupleDesc tupdesc = relation->rd_att;
Oid relfilenode = relation->rd_node.relNode;
CStoreScanDesc scan = palloc(sizeof(CStoreScanDescData));
List *neededColumnList = NIL;
MemoryContext oldContext = MemoryContextSwitchTo(GetCStoreMemoryContext());
MemoryContext oldContext = MemoryContextSwitchTo(TopTransactionContext);
ListCell *columnCell = NULL;
scan->cs_base.rs_rd = relation;
@ -275,6 +213,15 @@ cstore_beginscan_extended(Relation relation, Snapshot snapshot,
scan->cs_base.rs_flags = flags;
scan->cs_base.rs_parallel = parallel_scan;
if (PendingWritesInUpperTransactions(relfilenode, GetCurrentSubTransactionId()))
{
elog(ERROR,
"cannot read from table when there is unflushed data in upper transactions");
}
FlushWriteStateForRelfilenode(relfilenode, GetCurrentSubTransactionId());
List *columnList = RelationColumnList(relation);
/* only collect columns that we need for the scan */
@ -319,7 +266,7 @@ static bool
cstore_getnextslot(TableScanDesc sscan, ScanDirection direction, TupleTableSlot *slot)
{
CStoreScanDesc scan = (CStoreScanDesc) sscan;
MemoryContext oldContext = MemoryContextSwitchTo(GetCStoreMemoryContext());
MemoryContext oldContext = MemoryContextSwitchTo(TopTransactionContext);
ExecClearTuple(slot);
@ -437,9 +384,9 @@ static void
cstore_tuple_insert(Relation relation, TupleTableSlot *slot, CommandId cid,
int options, BulkInsertState bistate)
{
MemoryContext oldContext = MemoryContextSwitchTo(GetCStoreMemoryContext());
cstore_init_write_state(relation);
TableWriteState *writeState = cstore_init_write_state(relation->rd_node,
RelationGetDescr(relation),
GetCurrentSubTransactionId());
HeapTuple heapTuple = ExecCopySlotHeapTuple(slot);
if (HeapTupleHasExternal(heapTuple))
@ -453,8 +400,7 @@ cstore_tuple_insert(Relation relation, TupleTableSlot *slot, CommandId cid,
slot_getallattrs(slot);
CStoreWriteRow(CStoreWriteState, slot->tts_values, slot->tts_isnull);
MemoryContextSwitchTo(oldContext);
CStoreWriteRow(writeState, slot->tts_values, slot->tts_isnull);
}
@ -479,9 +425,9 @@ static void
cstore_multi_insert(Relation relation, TupleTableSlot **slots, int ntuples,
CommandId cid, int options, BulkInsertState bistate)
{
MemoryContext oldContext = MemoryContextSwitchTo(GetCStoreMemoryContext());
cstore_init_write_state(relation);
TableWriteState *writeState = cstore_init_write_state(relation->rd_node,
RelationGetDescr(relation),
GetCurrentSubTransactionId());
for (int i = 0; i < ntuples; i++)
{
@ -499,9 +445,8 @@ cstore_multi_insert(Relation relation, TupleTableSlot **slots, int ntuples,
slot_getallattrs(tupleSlot);
CStoreWriteRow(CStoreWriteState, tupleSlot->tts_values, tupleSlot->tts_isnull);
CStoreWriteRow(writeState, tupleSlot->tts_values, tupleSlot->tts_isnull);
}
MemoryContextSwitchTo(oldContext);
}
@ -537,11 +482,9 @@ cstore_tuple_lock(Relation relation, ItemPointer tid, Snapshot snapshot,
static void
cstore_finish_bulk_insert(Relation relation, int options)
{
/*TODO: flush relation like for heap? */
/* free write state or only in ExecutorEnd_hook? */
/* for COPY */
cstore_free_write_state();
/*
* Nothing to do here. We keep write states live until transaction end.
*/
}
@ -556,6 +499,9 @@ cstore_relation_set_new_filenode(Relation rel,
uint64 blockRowCount = 0;
uint64 stripeRowCount = 0;
CompressionType compression = 0;
Oid oldRelfilenode = rel->rd_node.relNode;
MarkRelfilenodeDropped(oldRelfilenode, GetCurrentSubTransactionId());
if (metadata != NULL)
{
@ -589,7 +535,10 @@ cstore_relation_set_new_filenode(Relation rel,
static void
cstore_relation_nontransactional_truncate(Relation rel)
{
DataFileMetadata *metadata = ReadDataFileMetadata(rel->rd_node.relNode, false);
Oid relfilenode = rel->rd_node.relNode;
DataFileMetadata *metadata = ReadDataFileMetadata(relfilenode, false);
NonTransactionDropWriteState(relfilenode);
/*
* No need to set new relfilenode, since the table was created in this
@ -651,23 +600,23 @@ cstore_relation_copy_for_cluster(Relation OldHeap, Relation NewHeap,
* relation first.
*/
CStoreOptions *cstoreOptions = CStoreTableAMGetOptions(OldHeap);
CStoreOptions *cstoreOptions = CStoreTableAMGetOptions(OldHeap->rd_node.relNode);
UpdateCStoreDataFileMetadata(NewHeap->rd_node.relNode,
cstoreOptions->blockRowCount,
cstoreOptions->stripeRowCount,
cstoreOptions->compressionType);
cstoreOptions = CStoreTableAMGetOptions(NewHeap);
cstoreOptions = CStoreTableAMGetOptions(NewHeap->rd_node.relNode);
TableWriteState *writeState = CStoreBeginWrite(NewHeap,
TableWriteState *writeState = CStoreBeginWrite(NewHeap->rd_node,
cstoreOptions->compressionType,
cstoreOptions->stripeRowCount,
cstoreOptions->blockRowCount,
targetDesc);
TableReadState *readState = CStoreBeginRead(OldHeap, sourceDesc, RelationColumnList(
OldHeap), NULL);
TableReadState *readState = CStoreBeginRead(OldHeap, sourceDesc,
RelationColumnList(OldHeap), NULL);
Datum *values = palloc0(sourceDesc->natts * sizeof(Datum));
bool *nulls = palloc0(sourceDesc->natts * sizeof(bool));
@ -1046,18 +995,61 @@ cstore_scan_sample_next_tuple(TableScanDesc scan, SampleScanState *scanstate,
static void
CStoreExecutorEnd(QueryDesc *queryDesc)
CStoreXactCallback(XactEvent event, void *arg)
{
cstore_free_write_state();
if (PreviousExecutorEndHook)
switch (event)
{
PreviousExecutorEndHook(queryDesc);
case XACT_EVENT_COMMIT:
case XACT_EVENT_PARALLEL_COMMIT:
case XACT_EVENT_PREPARE:
{
/* nothing to do */
break;
}
case XACT_EVENT_ABORT:
case XACT_EVENT_PARALLEL_ABORT:
{
DiscardWriteStateForAllRels(GetCurrentSubTransactionId(), 0);
break;
}
case XACT_EVENT_PRE_COMMIT:
case XACT_EVENT_PARALLEL_PRE_COMMIT:
case XACT_EVENT_PRE_PREPARE:
{
FlushWriteStateForAllRels(GetCurrentSubTransactionId(), 0);
break;
}
}
else
}
static void
CStoreSubXactCallback(SubXactEvent event, SubTransactionId mySubid,
SubTransactionId parentSubid, void *arg)
{
switch (event)
{
standard_ExecutorEnd(queryDesc);
case SUBXACT_EVENT_START_SUB:
case SUBXACT_EVENT_COMMIT_SUB:
{
/* nothing to do */
break;
}
case SUBXACT_EVENT_ABORT_SUB:
{
DiscardWriteStateForAllRels(mySubid, parentSubid);
break;
}
case SUBXACT_EVENT_PRE_COMMIT_SUB:
{
FlushWriteStateForAllRels(mySubid, parentSubid);
break;
}
}
ResetCStoreMemoryContext();
}
@ -1109,12 +1101,13 @@ CStoreTableAMProcessUtility(PlannedStmt * plannedStatement,
void
cstore_tableam_init()
{
PreviousExecutorEndHook = ExecutorEnd_hook;
ExecutorEnd_hook = CStoreExecutorEnd;
RegisterXactCallback(CStoreXactCallback, NULL);
RegisterSubXactCallback(CStoreSubXactCallback, NULL);
PreviousProcessUtilityHook = (ProcessUtility_hook != NULL) ?
ProcessUtility_hook : standard_ProcessUtility;
ProcessUtility_hook = CStoreTableAMProcessUtility;
prevObjectAccessHook = object_access_hook;
PrevObjectAccessHook = object_access_hook;
object_access_hook = CStoreTableAMObjectAccessHook;
cstore_customscan_init();
@ -1124,7 +1117,7 @@ cstore_tableam_init()
void
cstore_tableam_finish()
{
ExecutorEnd_hook = PreviousExecutorEndHook;
object_access_hook = PrevObjectAccessHook;
}
@ -1140,9 +1133,9 @@ CStoreTableAMObjectAccessHook(ObjectAccessType access, Oid classId, Oid objectId
subId,
void *arg)
{
if (prevObjectAccessHook)
if (PrevObjectAccessHook)
{
prevObjectAccessHook(access, classId, objectId, subId, arg);
PrevObjectAccessHook(access, classId, objectId, subId, arg);
}
/*
@ -1166,7 +1159,10 @@ CStoreTableAMObjectAccessHook(ObjectAccessType access, Oid classId, Oid objectId
* tableam tables storage is managed by postgres.
*/
Relation rel = table_open(objectId, AccessExclusiveLock);
DeleteDataFileMetadataRowIfExists(rel->rd_node.relNode);
Oid relfilenode = rel->rd_node.relNode;
DeleteDataFileMetadataRowIfExists(relfilenode);
MarkRelfilenodeDropped(relfilenode, GetCurrentSubTransactionId());
/* keep the lock since we did physical changes to the relation */
table_close(rel, NoLock);

View File

@ -18,6 +18,7 @@
#include "safe_lib.h"
#include "access/heapam.h"
#include "access/nbtree.h"
#include "catalog/pg_am.h"
#include "miscadmin.h"
@ -25,6 +26,7 @@
#include "storage/smgr.h"
#include "utils/memutils.h"
#include "utils/rel.h"
#include "utils/relfilenodemap.h"
#include "columnar/cstore.h"
#include "columnar/cstore_version_compat.h"
@ -58,7 +60,7 @@ static StringInfo CopyStringInfo(StringInfo sourceString);
* will be added.
*/
TableWriteState *
CStoreBeginWrite(Relation relation,
CStoreBeginWrite(RelFileNode relfilenode,
CompressionType compressionType,
uint64 stripeMaxRowCount, uint32 blockRowCount,
TupleDesc tupleDescriptor)
@ -101,11 +103,11 @@ CStoreBeginWrite(Relation relation,
blockRowCount);
TableWriteState *writeState = palloc0(sizeof(TableWriteState));
writeState->relation = relation;
writeState->relfilenode = relfilenode;
writeState->compressionType = compressionType;
writeState->stripeMaxRowCount = stripeMaxRowCount;
writeState->blockRowCount = blockRowCount;
writeState->tupleDescriptor = tupleDescriptor;
writeState->tupleDescriptor = CreateTupleDescCopy(tupleDescriptor);
writeState->comparisonFunctionArray = comparisonFunctionArray;
writeState->stripeBuffers = NULL;
writeState->stripeSkipList = NULL;
@ -205,11 +207,7 @@ CStoreWriteRow(TableWriteState *writeState, Datum *columnValues, bool *columnNul
stripeBuffers->rowCount++;
if (stripeBuffers->rowCount >= writeState->stripeMaxRowCount)
{
FlushStripe(writeState);
/* set stripe data and skip list to NULL so they are recreated next time */
writeState->stripeBuffers = NULL;
writeState->stripeSkipList = NULL;
CStoreFlushPendingWrites(writeState);
}
MemoryContextSwitchTo(oldContext);
@ -225,17 +223,7 @@ CStoreWriteRow(TableWriteState *writeState, Datum *columnValues, bool *columnNul
void
CStoreEndWrite(TableWriteState *writeState)
{
StripeBuffers *stripeBuffers = writeState->stripeBuffers;
if (stripeBuffers != NULL)
{
MemoryContext oldContext = MemoryContextSwitchTo(writeState->stripeWriteContext);
FlushStripe(writeState);
MemoryContextReset(writeState->stripeWriteContext);
MemoryContextSwitchTo(oldContext);
}
CStoreFlushPendingWrites(writeState);
MemoryContextDelete(writeState->stripeWriteContext);
pfree(writeState->comparisonFunctionArray);
@ -244,6 +232,25 @@ CStoreEndWrite(TableWriteState *writeState)
}
void
CStoreFlushPendingWrites(TableWriteState *writeState)
{
StripeBuffers *stripeBuffers = writeState->stripeBuffers;
if (stripeBuffers != NULL)
{
MemoryContext oldContext = MemoryContextSwitchTo(writeState->stripeWriteContext);
FlushStripe(writeState);
/* set stripe data and skip list to NULL so they are recreated next time */
writeState->stripeBuffers = NULL;
writeState->stripeSkipList = NULL;
MemoryContextSwitchTo(oldContext);
}
}
/*
* CreateEmptyStripeBuffers allocates an empty StripeBuffers structure with the given
* column count.
@ -410,6 +417,10 @@ FlushStripe(TableWriteState *writeState)
uint64 stripeSize = 0;
uint64 stripeRowCount = 0;
Oid relationId = RelidByRelfilenode(writeState->relfilenode.spcNode,
writeState->relfilenode.relNode);
Relation relation = relation_open(relationId, NoLock);
/*
* check if the last block needs serialization , the last block was not serialized
* if it was not full yet, e.g. (rowCount > 0)
@ -459,7 +470,7 @@ FlushStripe(TableWriteState *writeState)
stripeSkipList->blockSkipNodeArray[0][blockIndex].rowCount;
}
stripeMetadata = ReserveStripe(writeState->relation, stripeSize,
stripeMetadata = ReserveStripe(relation, stripeSize,
stripeRowCount, columnCount, blockCount,
blockRowCount);
@ -486,7 +497,7 @@ FlushStripe(TableWriteState *writeState)
columnBuffers->blockBuffersArray[blockIndex];
StringInfo existsBuffer = blockBuffers->existsBuffer;
WriteToSmgr(writeState->relation, currentFileOffset,
WriteToSmgr(relation, currentFileOffset,
existsBuffer->data, existsBuffer->len);
currentFileOffset += existsBuffer->len;
}
@ -497,16 +508,18 @@ FlushStripe(TableWriteState *writeState)
columnBuffers->blockBuffersArray[blockIndex];
StringInfo valueBuffer = blockBuffers->valueBuffer;
WriteToSmgr(writeState->relation, currentFileOffset,
WriteToSmgr(relation, currentFileOffset,
valueBuffer->data, valueBuffer->len);
currentFileOffset += valueBuffer->len;
}
}
/* create skip list and footer buffers */
SaveStripeSkipList(writeState->relation->rd_node.relNode,
SaveStripeSkipList(relation->rd_node.relNode,
stripeMetadata.id,
stripeSkipList, tupleDescriptor);
relation_close(relation, NoLock);
}
@ -747,3 +760,10 @@ CopyStringInfo(StringInfo sourceString)
return targetString;
}
bool
ContainsPendingWrites(TableWriteState *state)
{
return state->stripeBuffers != NULL && state->stripeBuffers->rowCount != 0;
}

View File

@ -0,0 +1,384 @@
#include "citus_version.h"
#if HAS_TABLEAM
#include "postgres.h"
#include <math.h>
#include "miscadmin.h"
#include "access/genam.h"
#include "access/heapam.h"
#include "access/multixact.h"
#include "access/rewriteheap.h"
#include "access/tsmapi.h"
#if PG_VERSION_NUM >= 130000
#include "access/heaptoast.h"
#else
#include "access/tuptoaster.h"
#endif
#include "access/xact.h"
#include "catalog/catalog.h"
#include "catalog/index.h"
#include "catalog/objectaccess.h"
#include "catalog/pg_am.h"
#include "catalog/pg_trigger.h"
#include "catalog/storage.h"
#include "catalog/storage_xlog.h"
#include "commands/progress.h"
#include "commands/vacuum.h"
#include "executor/executor.h"
#include "nodes/makefuncs.h"
#include "optimizer/plancat.h"
#include "pgstat.h"
#include "storage/bufmgr.h"
#include "storage/bufpage.h"
#include "storage/bufmgr.h"
#include "storage/lmgr.h"
#include "storage/predicate.h"
#include "storage/procarray.h"
#include "storage/smgr.h"
#include "tcop/utility.h"
#include "utils/builtins.h"
#include "utils/pg_rusage.h"
#include "utils/rel.h"
#include "utils/syscache.h"
#include "columnar/cstore.h"
#include "columnar/cstore_customscan.h"
#include "columnar/cstore_tableam.h"
#include "columnar/cstore_version_compat.h"
/*
* Mapping from relfilenode to WriteStateMapEntry. This keeps write state for
* each relation.
*/
static HTAB *WriteStateMap = NULL;
/* memory context for allocating WriteStateMap & all write states */
static MemoryContext WriteStateContext = NULL;
/*
* Each member of the writeStateStack in WriteStateMapEntry. This means that
* we did some inserts in the subtransaction subXid, and the state of those
* inserts is stored at writeState. Those writes can be flushed or unflushed.
*/
typedef struct SubXidWriteState
{
SubTransactionId subXid;
TableWriteState *writeState;
struct SubXidWriteState *next;
} SubXidWriteState;
/*
* An entry in WriteStateMap.
*/
typedef struct WriteStateMapEntry
{
/* key of the entry */
Oid relfilenode;
/*
* If a table is dropped, we set dropped to true and set dropSubXid to the
* id of the subtransaction in which the drop happened.
*/
bool dropped;
SubTransactionId dropSubXid;
/*
* Stack of SubXidWriteState where first element is top of the stack. When
* inserts happen, we look at top of the stack. If top of stack belongs to
* current subtransaction, we forward writes to its writeState. Otherwise,
* we create a new stack entry for current subtransaction and push it to
* the stack, and forward writes to that.
*/
SubXidWriteState *writeStateStack;
} WriteStateMapEntry;
/*
* Memory context reset callback so we reset WriteStateMap to NULL at the end
* of transaction. WriteStateMap is allocated in & WriteStateMap, so its
* leaked reference can cause memory issues.
*/
static MemoryContextCallback cleanupCallback;
static void
CleanupWriteStateMap(void *arg)
{
WriteStateMap = NULL;
WriteStateContext = NULL;
}
TableWriteState *
cstore_init_write_state(RelFileNode relfilenode, TupleDesc tupdesc,
SubTransactionId currentSubXid)
{
bool found;
/*
* If this is the first call in current transaction, allocate the hash
* table.
*/
if (WriteStateMap == NULL)
{
WriteStateContext =
AllocSetContextCreate(
TopTransactionContext,
"Column Store Write State Management Context",
ALLOCSET_DEFAULT_SIZES);
HASHCTL info;
uint32 hashFlags = (HASH_ELEM | HASH_CONTEXT);
memset(&info, 0, sizeof(info));
info.keysize = sizeof(Oid);
info.entrysize = sizeof(WriteStateMapEntry);
info.hcxt = WriteStateContext;
WriteStateMap = hash_create("column store write state map",
64, &info, hashFlags);
cleanupCallback.arg = NULL;
cleanupCallback.func = &CleanupWriteStateMap;
cleanupCallback.next = NULL;
MemoryContextRegisterResetCallback(WriteStateContext, &cleanupCallback);
}
WriteStateMapEntry *hashEntry = hash_search(WriteStateMap, &relfilenode.relNode,
HASH_ENTER, &found);
if (!found)
{
hashEntry->writeStateStack = NULL;
hashEntry->dropped = false;
}
Assert(!hashEntry->dropped);
/*
* If top of stack belongs to the current subtransaction, return its
* writeState, ...
*/
if (hashEntry->writeStateStack != NULL)
{
SubXidWriteState *stackHead = hashEntry->writeStateStack;
if (stackHead->subXid == currentSubXid)
{
return stackHead->writeState;
}
}
/*
* ... otherwise we need to create a new stack entry for the current
* subtransaction.
*/
MemoryContext oldContext = MemoryContextSwitchTo(WriteStateContext);
CStoreOptions *cstoreOptions = CStoreTableAMGetOptions(relfilenode.relNode);
SubXidWriteState *stackEntry = palloc0(sizeof(SubXidWriteState));
stackEntry->writeState = CStoreBeginWrite(relfilenode,
cstoreOptions->compressionType,
cstoreOptions->stripeRowCount,
cstoreOptions->blockRowCount,
tupdesc);
stackEntry->subXid = currentSubXid;
stackEntry->next = hashEntry->writeStateStack;
hashEntry->writeStateStack = stackEntry;
MemoryContextSwitchTo(oldContext);
return stackEntry->writeState;
}
/*
* Flushes pending writes for given relfilenode in the given subtransaction.
*/
void
FlushWriteStateForRelfilenode(Oid relfilenode, SubTransactionId currentSubXid)
{
WriteStateMapEntry *entry;
bool found = false;
if (WriteStateMap)
{
entry = hash_search(WriteStateMap, &relfilenode, HASH_FIND, &found);
}
Assert(!found || !entry->dropped);
if (found && entry->writeStateStack != NULL)
{
SubXidWriteState *stackEntry = entry->writeStateStack;
if (stackEntry->subXid == currentSubXid)
{
CStoreFlushPendingWrites(stackEntry->writeState);
}
}
}
/*
* Helper function for FlushWriteStateForAllRels and DiscardWriteStateForAllRels.
* Pops all of write states for current subtransaction, and depending on "commit"
* either flushes them or discards them. This also takes into account dropped
* tables, and either propagates the dropped flag to parent subtransaction or
* rolls back abort.
*/
static void
PopWriteStateForAllRels(SubTransactionId currentSubXid, SubTransactionId parentSubXid,
bool commit)
{
HASH_SEQ_STATUS status;
WriteStateMapEntry *entry;
if (WriteStateMap == NULL)
{
return;
}
hash_seq_init(&status, WriteStateMap);
while ((entry = hash_seq_search(&status)) != 0)
{
if (entry->writeStateStack == NULL)
{
continue;
}
/*
* If the table has been dropped in current subtransaction, either
* commit the drop or roll it back.
*/
if (entry->dropped)
{
if (entry->dropSubXid == currentSubXid)
{
if (commit)
{
/* elevate drop to the upper subtransaction */
entry->dropSubXid = parentSubXid;
}
else
{
/* abort the drop */
entry->dropped = false;
}
}
}
/*
* Otherwise, commit or discard pending writes.
*/
else
{
SubXidWriteState *stackHead = entry->writeStateStack;
if (stackHead->subXid == currentSubXid)
{
if (commit)
{
CStoreEndWrite(stackHead->writeState);
}
entry->writeStateStack = stackHead->next;
}
}
}
}
/*
* Called when current subtransaction is committed.
*/
void
FlushWriteStateForAllRels(SubTransactionId currentSubXid, SubTransactionId parentSubXid)
{
PopWriteStateForAllRels(currentSubXid, parentSubXid, true);
}
/*
* Called when current subtransaction is aborted.
*/
void
DiscardWriteStateForAllRels(SubTransactionId currentSubXid, SubTransactionId parentSubXid)
{
PopWriteStateForAllRels(currentSubXid, parentSubXid, false);
}
/*
* Called when the given relfilenode is dropped.
*/
void
MarkRelfilenodeDropped(Oid relfilenode, SubTransactionId currentSubXid)
{
bool found = false;
if (WriteStateMap == NULL)
{
return;
}
WriteStateMapEntry *entry = hash_search(WriteStateMap, &relfilenode, HASH_FIND,
&found);
if (!found || entry->dropped)
{
return;
}
entry->dropped = true;
entry->dropSubXid = currentSubXid;
}
/*
* Called when the given relfilenode is dropped in non-transactional TRUNCATE.
*/
void
NonTransactionDropWriteState(Oid relfilenode)
{
if (WriteStateMap)
{
hash_search(WriteStateMap, &relfilenode, HASH_REMOVE, false);
}
}
/*
* Returns true if there are any pending writes in upper transactions.
*/
bool
PendingWritesInUpperTransactions(Oid relfilenode, SubTransactionId currentSubXid)
{
WriteStateMapEntry *entry;
bool found = false;
if (WriteStateMap)
{
entry = hash_search(WriteStateMap, &relfilenode, HASH_FIND, &found);
}
if (found && entry->writeStateStack != NULL)
{
SubXidWriteState *stackEntry = entry->writeStateStack;
while (stackEntry != NULL)
{
if (stackEntry->subXid != currentSubXid &&
ContainsPendingWrites(stackEntry->writeState))
{
return true;
}
stackEntry = stackEntry->next;
}
}
return false;
}
#endif

View File

@ -20,6 +20,7 @@
#include "nodes/parsenodes.h"
#include "storage/bufpage.h"
#include "storage/lockdefs.h"
#include "storage/relfilenode.h"
#include "utils/relcache.h"
#include "utils/snapmgr.h"
@ -224,7 +225,7 @@ typedef struct TableWriteState
CompressionType compressionType;
TupleDesc tupleDescriptor;
FmgrInfo **comparisonFunctionArray;
Relation relation;
RelFileNode relfilenode;
MemoryContext stripeWriteContext;
StripeBuffers *stripeBuffers;
@ -251,14 +252,16 @@ extern void cstore_init(void);
extern CompressionType ParseCompressionType(const char *compressionTypeString);
/* Function declarations for writing to a cstore file */
extern TableWriteState * CStoreBeginWrite(Relation relation,
extern TableWriteState * CStoreBeginWrite(RelFileNode relfilenode,
CompressionType compressionType,
uint64 stripeMaxRowCount,
uint32 blockRowCount,
TupleDesc tupleDescriptor);
extern void CStoreWriteRow(TableWriteState *state, Datum *columnValues,
bool *columnNulls);
extern void CStoreFlushPendingWrites(TableWriteState *state);
extern void CStoreEndWrite(TableWriteState *state);
extern bool ContainsPendingWrites(TableWriteState *state);
/* Function declarations for reading from a cstore file */
extern TableReadState * CStoreBeginRead(Relation relation,
@ -281,6 +284,7 @@ extern bool CompressBuffer(StringInfo inputBuffer, StringInfo outputBuffer,
CompressionType compressionType);
extern StringInfo DecompressBuffer(StringInfo buffer, CompressionType compressionType);
extern char * CompressionTypeStr(CompressionType type);
extern CStoreOptions * CStoreTableAMGetOptions(Oid relfilenode);
/* cstore_metadata_tables.c */
extern void DeleteDataFileMetadataRowIfExists(Oid relfilenode);
@ -300,6 +304,22 @@ extern StripeSkipList * ReadStripeSkipList(Oid relfilenode, uint64 stripe,
TupleDesc tupleDescriptor,
uint32 blockCount);
/* write_state_management.c */
extern TableWriteState * cstore_init_write_state(RelFileNode relfilenode, TupleDesc
tupdesc,
SubTransactionId currentSubXid);
extern void FlushWriteStateForRelfilenode(Oid relfilenode, SubTransactionId
currentSubXid);
extern void FlushWriteStateForAllRels(SubTransactionId currentSubXid, SubTransactionId
parentSubXid);
extern void DiscardWriteStateForAllRels(SubTransactionId currentSubXid, SubTransactionId
parentSubXid);
extern void MarkRelfilenodeDropped(Oid relfilenode, SubTransactionId currentSubXid);
extern void NonTransactionDropWriteState(Oid relfilenode);
extern bool PendingWritesInUpperTransactions(Oid relfilenode,
SubTransactionId currentSubXid);
typedef struct SmgrAddr
{
BlockNumber blockno;

View File

@ -16,3 +16,5 @@ test: am_block_filtering
test: am_join
test: am_trigger
test: am_tableoptions
test: am_recursive
test: am_transactions

View File

@ -0,0 +1,252 @@
CREATE TABLE t1(a int, b int) USING cstore_tableam;
CREATE TABLE t2(a int, b int) USING cstore_tableam;
CREATE FUNCTION f(x INT) RETURNS INT AS $$
INSERT INTO t1 VALUES(x, x * 2) RETURNING b - 1;
$$ LANGUAGE SQL;
--
-- Following query will start a write to t1 before finishing
-- write to t1, so it tests that we handle recursive writes
-- correctly.
--
INSERT INTO t2 SELECT i, f(i) FROM generate_series(1, 5) i;
-- there are no subtransactions, so above statement should batch
-- INSERTs inside the UDF and create on stripe per table.
SELECT relname, count(*) FROM cstore.cstore_stripes a, pg_class b
WHERE a.relfilenode=b.relfilenode AND relname IN ('t1', 't2')
GROUP BY relname
ORDER BY relname;
relname | count
---------------------------------------------------------------------
t1 | 1
t2 | 1
(2 rows)
SELECT * FROM t1 ORDER BY a;
a | b
---------------------------------------------------------------------
1 | 2
2 | 4
3 | 6
4 | 8
5 | 10
(5 rows)
SELECT * FROM t2 ORDER BY a;
a | b
---------------------------------------------------------------------
1 | 1
2 | 3
3 | 5
4 | 7
5 | 9
(5 rows)
TRUNCATE t1;
TRUNCATE t2;
DROP FUNCTION f(INT);
--
-- Test the case when 2 writes are going on concurrently in the
-- same executor, and those 2 writes are dependent.
--
WITH t AS (
INSERT INTO t1 SELECT i, 2*i FROM generate_series(1, 5) i RETURNING *
)
INSERT INTO t2 SELECT t.a, t.a+1 FROM t;
SELECT * FROM t1;
a | b
---------------------------------------------------------------------
1 | 2
2 | 4
3 | 6
4 | 8
5 | 10
(5 rows)
SELECT * FROM t2;
a | b
---------------------------------------------------------------------
1 | 2
2 | 3
3 | 4
4 | 5
5 | 6
(5 rows)
TRUNCATE t1;
TRUNCATE t2;
--
-- Test the case when there are 2 independent inserts in a CTE.
-- Also tests the case where some of the tuple_inserts happen in
-- ExecutorFinish() instead of ExecutorRun().
--
WITH t AS (
INSERT INTO t1 SELECT i, 2*i FROM generate_series(1, 5) i RETURNING *
)
INSERT INTO t2 SELECT i, (select count(*) from t1) FROM generate_series(1, 3) i;
SELECT * FROM t1;
a | b
---------------------------------------------------------------------
1 | 2
2 | 4
3 | 6
4 | 8
5 | 10
(5 rows)
SELECT * FROM t2;
a | b
---------------------------------------------------------------------
1 | 0
2 | 0
3 | 0
(3 rows)
TRUNCATE t1;
TRUNCATE t2;
--
-- double insert on the same relation
--
WITH t AS (
INSERT INTO t1 SELECT i, 2*i FROM generate_series(1, 5) i RETURNING *
)
INSERT INTO t1 SELECT t.a, t.a+1 FROM t;
SELECT * FROM t1 ORDER BY a, b;
a | b
---------------------------------------------------------------------
1 | 2
1 | 2
2 | 3
2 | 4
3 | 4
3 | 6
4 | 5
4 | 8
5 | 6
5 | 10
(10 rows)
TRUNCATE t1;
TRUNCATE t2;
--
-- A test where result of a UDF call will depend on execution
-- of previous UDF calls.
--
CREATE FUNCTION g(x INT) RETURNS INT AS $$
INSERT INTO t1 VALUES(x, x * 2);
SELECT count(*)::int FROM t1;
$$ LANGUAGE SQL;
-- t3 and t4 are heap tables to help with cross-checking results
CREATE TABLE t3(a int, b int);
CREATE TABLE t4(a int, b int);
CREATE FUNCTION g2(x INT) RETURNS INT AS $$
INSERT INTO t3 VALUES(x, x * 2);
SELECT count(*)::int FROM t3;
$$ LANGUAGE SQL;
INSERT INTO t2 SELECT i, g(i) FROM generate_series(1, 5) i;
INSERT INTO t4 SELECT i, g2(i) FROM generate_series(1, 5) i;
-- check that t1==t3 and t2==t4.
((table t1) except (table t3)) union ((table t3) except (table t1));
a | b
---------------------------------------------------------------------
(0 rows)
((table t2) except (table t4)) union ((table t4) except (table t2));
a | b
---------------------------------------------------------------------
(0 rows)
SELECT * FROM t2 ORDER BY a, b;
a | b
---------------------------------------------------------------------
1 | 1
2 | 2
3 | 3
4 | 4
5 | 5
(5 rows)
TRUNCATE t1, t2, t3, t4;
--
-- INSERT into the same relation that was INSERTed into in the UDF
--
INSERT INTO t1 SELECT i, g(i) FROM generate_series(1, 3) i;
INSERT INTO t3 SELECT i, g2(i) FROM generate_series(1, 3) i;
SELECT * FROM t1 ORDER BY a, b;
a | b
---------------------------------------------------------------------
1 | 1
1 | 2
2 | 3
2 | 4
3 | 5
3 | 6
(6 rows)
SELECT * FROM t3 ORDER BY a, b;
a | b
---------------------------------------------------------------------
1 | 1
1 | 2
2 | 3
2 | 4
3 | 5
3 | 6
(6 rows)
-- check that t1==t3 and t2==t4.
((table t1) except (table t3)) union ((table t3) except (table t1));
a | b
---------------------------------------------------------------------
(0 rows)
((table t2) except (table t4)) union ((table t4) except (table t2));
a | b
---------------------------------------------------------------------
(0 rows)
DROP FUNCTION g(int), g2(int);
TRUNCATE t1, t2, t3, t4;
--
-- EXCEPTION in plpgsql, which is implemented internally using
-- subtransactions. plgpsql uses SPI to execute INSERT statements.
--
CREATE FUNCTION f(a int) RETURNS VOID AS $$
DECLARE
x int;
BEGIN
INSERT INTO t1 SELECT i, i + 1 FROM generate_series(a, a + 1) i;
x := 10 / a;
INSERT INTO t1 SELECT i, i * 2 FROM generate_series(a + 2, a + 3) i;
EXCEPTION WHEN division_by_zero THEN
INSERT INTO t1 SELECT i, i + 1 FROM generate_series(a + 2, a + 3) i;
END;
$$ LANGUAGE plpgsql;
SELECT f(10);
f
---------------------------------------------------------------------
(1 row)
SELECT f(0), f(20);
f | f
---------------------------------------------------------------------
|
(1 row)
SELECT * FROM t1 ORDER BY a, b;
a | b
---------------------------------------------------------------------
2 | 3
3 | 4
10 | 11
11 | 12
12 | 24
13 | 26
20 | 21
21 | 22
22 | 44
23 | 46
(10 rows)
DROP FUNCTION f(int);
DROP TABLE t1, t2, t3, t4;

View File

@ -37,6 +37,12 @@ WHERE a.relfilenode = b.relfilenode AND b.relname = 't';
BEGIN;
SAVEPOINT s0;
INSERT INTO t SELECT i, i+1 FROM generate_series(1, 10) i;
SELECT count(*) FROM t; -- force flush
count
---------------------------------------------------------------------
20
(1 row)
SAVEPOINT s1;
INSERT INTO t SELECT i, i+1 FROM generate_series(1, 10) i;
SELECT count(*) FROM t;

View File

@ -0,0 +1,142 @@
--
-- Testing we handle transactions properly
--
CREATE TABLE t(a int, b int) USING cstore_tableam;
INSERT INTO t SELECT i, 2 * i FROM generate_series(1, 3) i;
SELECT * FROM t ORDER BY a;
a | b
---------------------------------------------------------------------
1 | 2
2 | 4
3 | 6
(3 rows)
-- verify that table rewrites work properly
BEGIN;
ALTER TABLE t ALTER COLUMN b TYPE float4 USING (b + 0.5)::float4;
INSERT INTO t VALUES (4, 8.5);
SELECT * FROM t ORDER BY a;
a | b
---------------------------------------------------------------------
1 | 2.5
2 | 4.5
3 | 6.5
4 | 8.5
(4 rows)
ROLLBACK;
SELECT * FROM t ORDER BY a;
a | b
---------------------------------------------------------------------
1 | 2
2 | 4
3 | 6
(3 rows)
-- verify truncate rollback
BEGIN;
TRUNCATE t;
INSERT INTO t VALUES (4, 8);
SELECT * FROM t ORDER BY a;
a | b
---------------------------------------------------------------------
4 | 8
(1 row)
SAVEPOINT s1;
TRUNCATE t;
SELECT * FROM t ORDER BY a;
a | b
---------------------------------------------------------------------
(0 rows)
ROLLBACK TO SAVEPOINT s1;
SELECT * FROM t ORDER BY a;
a | b
---------------------------------------------------------------------
4 | 8
(1 row)
ROLLBACK;
-- verify truncate with unflushed data in upper xacts
BEGIN;
INSERT INTO t VALUES (4, 8);
SAVEPOINT s1;
TRUNCATE t;
ROLLBACK TO SAVEPOINT s1;
COMMIT;
SELECT * FROM t ORDER BY a;
a | b
---------------------------------------------------------------------
1 | 2
2 | 4
3 | 6
4 | 8
(4 rows)
-- verify DROP TABLE rollback
BEGIN;
INSERT INTO t VALUES (5, 10);
SELECT * FROM t ORDER BY a;
a | b
---------------------------------------------------------------------
1 | 2
2 | 4
3 | 6
4 | 8
5 | 10
(5 rows)
SAVEPOINT s1;
DROP TABLE t;
SELECT * FROM t ORDER BY a;
ERROR: relation "t" does not exist
ROLLBACK TO SAVEPOINT s1;
SELECT * FROM t ORDER BY a;
a | b
---------------------------------------------------------------------
1 | 2
2 | 4
3 | 6
4 | 8
5 | 10
(5 rows)
ROLLBACK;
-- verify DROP TABLE with unflushed data in upper xacts
BEGIN;
INSERT INTO t VALUES (5, 10);
SAVEPOINT s1;
DROP TABLE t;
SELECT * FROM t ORDER BY a;
ERROR: relation "t" does not exist
ROLLBACK TO SAVEPOINT s1;
COMMIT;
SELECT * FROM t ORDER BY a;
a | b
---------------------------------------------------------------------
1 | 2
2 | 4
3 | 6
4 | 8
5 | 10
(5 rows)
-- verify SELECT when unflushed data in upper transactions errors.
BEGIN;
INSERT INTO t VALUES (6, 12);
SAVEPOINT s1;
SELECT * FROM t;
ERROR: cannot read from table when there is unflushed data in upper transactions
ROLLBACK;
SELECT * FROM t ORDER BY a;
a | b
---------------------------------------------------------------------
1 | 2
2 | 4
3 | 6
4 | 8
5 | 10
(5 rows)
DROP TABLE t;

View File

@ -74,4 +74,98 @@ NOTICE: (3)
CONTEXT: PL/pgSQL function trs_after() line 14 at RAISE
NOTICE: (4)
CONTEXT: PL/pgSQL function trs_after() line 14 at RAISE
SELECT * FROM test_tr ORDER BY i;
i
---------------------------------------------------------------------
1
2
3
4
(4 rows)
drop table test_tr;
create table test_tr(i int) using cstore_tableam;
-- we should be able to clean-up and continue gracefully if we
-- error out in AFTER STATEMENT triggers.
CREATE SEQUENCE counter START 100;
create or replace function trs_after_erroring() returns trigger language plpgsql as $$
BEGIN
IF nextval('counter') % 2 = 0 THEN
RAISE EXCEPTION '%', 'error';
END IF;
RETURN NULL;
END;
$$;
create trigger tr_after_stmt_erroring after insert on test_tr
referencing new table as new_table
for each statement execute procedure trs_after_erroring();
--
-- Once upon a time we didn't clean-up properly after erroring out. Here the first
-- statement errors, but the second succeeds. In old times, because of failure in
-- clean-up, both rows were visible. But only the 2nd one should be visible.
--
insert into test_tr values(5);
ERROR: error
CONTEXT: PL/pgSQL function trs_after_erroring() line 4 at RAISE
insert into test_tr values(6);
SELECT * FROM test_tr ORDER BY i;
i
---------------------------------------------------------------------
6
(1 row)
drop table test_tr;
--
-- https://github.com/citusdata/cstore2/issues/32
--
create table events(
user_id bigint,
event_id bigint,
event_time timestamp default now(),
value float default random())
PARTITION BY RANGE (event_time);
create table events_p2020_11_04_102965
PARTITION OF events FOR VALUES FROM ('2020-11-04 00:00:00+01') TO ('2020-11-05 00:00:00+01')
USING cstore_tableam;
create table events_trigger_target(
user_id bigint,
avg float,
__count__ bigint
) USING cstore_tableam;
CREATE OR REPLACE FUNCTION user_value_by_day()
RETURNS trigger
LANGUAGE plpgsql
AS $function$
BEGIN
IF (TG_OP = 'INSERT' OR TG_OP = 'UPDATE') THEN
EXECUTE format($exec_format$INSERT INTO %s AS __mat__ SELECT user_id, 0.1 AS avg, pg_catalog.count(*) AS __count__ FROM __ins__ events GROUP BY user_id;
$exec_format$, TG_ARGV[0]);
END IF;
IF (TG_OP = 'DELETE' OR TG_OP = 'UPDATE') THEN
RAISE EXCEPTION $ex$MATERIALIZED VIEW 'user_value_by_day' on table 'events' does not support UPDATE/DELETE$ex$;
END IF;
IF (TG_OP = 'TRUNCATE') THEN
EXECUTE format($exec_format$TRUNCATE TABLE %s; $exec_format$, TG_ARGV[0]);
END IF;
RETURN NULL;
END;
$function$;
create trigger "user_value_by_day_INSERT" AFTER INSERT ON events
REFERENCING NEW TABLE AS __ins__
FOR EACH STATEMENT EXECUTE FUNCTION user_value_by_day('events_trigger_target');
COPY events FROM STDIN WITH (FORMAT 'csv');
SELECT * FROM events ORDER BY user_id;
user_id | event_id | event_time | value
---------------------------------------------------------------------
1 | 1 | Wed Nov 04 15:54:02.226999 2020 | 1.1
2 | 3 | Wed Nov 04 16:54:02.226999 2020 | 2.2
(2 rows)
SELECT * FROM events_trigger_target ORDER BY user_id;
user_id | avg | __count__
---------------------------------------------------------------------
1 | 0.1 | 1
2 | 0.1 | 1
(2 rows)
DROP TABLE events;

View File

@ -11,7 +11,7 @@ step s1-insert:
INSERT INTO test_vacuum_vs_insert SELECT i, 2 * i FROM generate_series(1, 3) i;
s2: INFO: statistics for "test_vacuum_vs_insert":
total file size: 24576, total data size: 26
total file size: 16384, total data size: 26
total row count: 3, stripe count: 1, average rows per stripe: 3
block count: 2, containing data for dropped columns: 0, none compressed: 2, pglz compressed: 0

View File

@ -0,0 +1,143 @@
CREATE TABLE t1(a int, b int) USING cstore_tableam;
CREATE TABLE t2(a int, b int) USING cstore_tableam;
CREATE FUNCTION f(x INT) RETURNS INT AS $$
INSERT INTO t1 VALUES(x, x * 2) RETURNING b - 1;
$$ LANGUAGE SQL;
--
-- Following query will start a write to t1 before finishing
-- write to t1, so it tests that we handle recursive writes
-- correctly.
--
INSERT INTO t2 SELECT i, f(i) FROM generate_series(1, 5) i;
-- there are no subtransactions, so above statement should batch
-- INSERTs inside the UDF and create on stripe per table.
SELECT relname, count(*) FROM cstore.cstore_stripes a, pg_class b
WHERE a.relfilenode=b.relfilenode AND relname IN ('t1', 't2')
GROUP BY relname
ORDER BY relname;
SELECT * FROM t1 ORDER BY a;
SELECT * FROM t2 ORDER BY a;
TRUNCATE t1;
TRUNCATE t2;
DROP FUNCTION f(INT);
--
-- Test the case when 2 writes are going on concurrently in the
-- same executor, and those 2 writes are dependent.
--
WITH t AS (
INSERT INTO t1 SELECT i, 2*i FROM generate_series(1, 5) i RETURNING *
)
INSERT INTO t2 SELECT t.a, t.a+1 FROM t;
SELECT * FROM t1;
SELECT * FROM t2;
TRUNCATE t1;
TRUNCATE t2;
--
-- Test the case when there are 2 independent inserts in a CTE.
-- Also tests the case where some of the tuple_inserts happen in
-- ExecutorFinish() instead of ExecutorRun().
--
WITH t AS (
INSERT INTO t1 SELECT i, 2*i FROM generate_series(1, 5) i RETURNING *
)
INSERT INTO t2 SELECT i, (select count(*) from t1) FROM generate_series(1, 3) i;
SELECT * FROM t1;
SELECT * FROM t2;
TRUNCATE t1;
TRUNCATE t2;
--
-- double insert on the same relation
--
WITH t AS (
INSERT INTO t1 SELECT i, 2*i FROM generate_series(1, 5) i RETURNING *
)
INSERT INTO t1 SELECT t.a, t.a+1 FROM t;
SELECT * FROM t1 ORDER BY a, b;
TRUNCATE t1;
TRUNCATE t2;
--
-- A test where result of a UDF call will depend on execution
-- of previous UDF calls.
--
CREATE FUNCTION g(x INT) RETURNS INT AS $$
INSERT INTO t1 VALUES(x, x * 2);
SELECT count(*)::int FROM t1;
$$ LANGUAGE SQL;
-- t3 and t4 are heap tables to help with cross-checking results
CREATE TABLE t3(a int, b int);
CREATE TABLE t4(a int, b int);
CREATE FUNCTION g2(x INT) RETURNS INT AS $$
INSERT INTO t3 VALUES(x, x * 2);
SELECT count(*)::int FROM t3;
$$ LANGUAGE SQL;
INSERT INTO t2 SELECT i, g(i) FROM generate_series(1, 5) i;
INSERT INTO t4 SELECT i, g2(i) FROM generate_series(1, 5) i;
-- check that t1==t3 and t2==t4.
((table t1) except (table t3)) union ((table t3) except (table t1));
((table t2) except (table t4)) union ((table t4) except (table t2));
SELECT * FROM t2 ORDER BY a, b;
TRUNCATE t1, t2, t3, t4;
--
-- INSERT into the same relation that was INSERTed into in the UDF
--
INSERT INTO t1 SELECT i, g(i) FROM generate_series(1, 3) i;
INSERT INTO t3 SELECT i, g2(i) FROM generate_series(1, 3) i;
SELECT * FROM t1 ORDER BY a, b;
SELECT * FROM t3 ORDER BY a, b;
-- check that t1==t3 and t2==t4.
((table t1) except (table t3)) union ((table t3) except (table t1));
((table t2) except (table t4)) union ((table t4) except (table t2));
DROP FUNCTION g(int), g2(int);
TRUNCATE t1, t2, t3, t4;
--
-- EXCEPTION in plpgsql, which is implemented internally using
-- subtransactions. plgpsql uses SPI to execute INSERT statements.
--
CREATE FUNCTION f(a int) RETURNS VOID AS $$
DECLARE
x int;
BEGIN
INSERT INTO t1 SELECT i, i + 1 FROM generate_series(a, a + 1) i;
x := 10 / a;
INSERT INTO t1 SELECT i, i * 2 FROM generate_series(a + 2, a + 3) i;
EXCEPTION WHEN division_by_zero THEN
INSERT INTO t1 SELECT i, i + 1 FROM generate_series(a + 2, a + 3) i;
END;
$$ LANGUAGE plpgsql;
SELECT f(10);
SELECT f(0), f(20);
SELECT * FROM t1 ORDER BY a, b;
DROP FUNCTION f(int);
DROP TABLE t1, t2, t3, t4;

View File

@ -23,6 +23,7 @@ WHERE a.relfilenode = b.relfilenode AND b.relname = 't';
BEGIN;
SAVEPOINT s0;
INSERT INTO t SELECT i, i+1 FROM generate_series(1, 10) i;
SELECT count(*) FROM t; -- force flush
SAVEPOINT s1;
INSERT INTO t SELECT i, i+1 FROM generate_series(1, 10) i;
SELECT count(*) FROM t;

View File

@ -0,0 +1,70 @@
--
-- Testing we handle transactions properly
--
CREATE TABLE t(a int, b int) USING cstore_tableam;
INSERT INTO t SELECT i, 2 * i FROM generate_series(1, 3) i;
SELECT * FROM t ORDER BY a;
-- verify that table rewrites work properly
BEGIN;
ALTER TABLE t ALTER COLUMN b TYPE float4 USING (b + 0.5)::float4;
INSERT INTO t VALUES (4, 8.5);
SELECT * FROM t ORDER BY a;
ROLLBACK;
SELECT * FROM t ORDER BY a;
-- verify truncate rollback
BEGIN;
TRUNCATE t;
INSERT INTO t VALUES (4, 8);
SELECT * FROM t ORDER BY a;
SAVEPOINT s1;
TRUNCATE t;
SELECT * FROM t ORDER BY a;
ROLLBACK TO SAVEPOINT s1;
SELECT * FROM t ORDER BY a;
ROLLBACK;
-- verify truncate with unflushed data in upper xacts
BEGIN;
INSERT INTO t VALUES (4, 8);
SAVEPOINT s1;
TRUNCATE t;
ROLLBACK TO SAVEPOINT s1;
COMMIT;
SELECT * FROM t ORDER BY a;
-- verify DROP TABLE rollback
BEGIN;
INSERT INTO t VALUES (5, 10);
SELECT * FROM t ORDER BY a;
SAVEPOINT s1;
DROP TABLE t;
SELECT * FROM t ORDER BY a;
ROLLBACK TO SAVEPOINT s1;
SELECT * FROM t ORDER BY a;
ROLLBACK;
-- verify DROP TABLE with unflushed data in upper xacts
BEGIN;
INSERT INTO t VALUES (5, 10);
SAVEPOINT s1;
DROP TABLE t;
SELECT * FROM t ORDER BY a;
ROLLBACK TO SAVEPOINT s1;
COMMIT;
SELECT * FROM t ORDER BY a;
-- verify SELECT when unflushed data in upper transactions errors.
BEGIN;
INSERT INTO t VALUES (6, 12);
SAVEPOINT s1;
SELECT * FROM t;
ROLLBACK;
SELECT * FROM t ORDER BY a;
DROP TABLE t;

View File

@ -58,4 +58,87 @@ create trigger tr_after_row after insert on test_tr
insert into test_tr values(1);
insert into test_tr values(2),(3),(4);
SELECT * FROM test_tr ORDER BY i;
drop table test_tr;
create table test_tr(i int) using cstore_tableam;
-- we should be able to clean-up and continue gracefully if we
-- error out in AFTER STATEMENT triggers.
CREATE SEQUENCE counter START 100;
create or replace function trs_after_erroring() returns trigger language plpgsql as $$
BEGIN
IF nextval('counter') % 2 = 0 THEN
RAISE EXCEPTION '%', 'error';
END IF;
RETURN NULL;
END;
$$;
create trigger tr_after_stmt_erroring after insert on test_tr
referencing new table as new_table
for each statement execute procedure trs_after_erroring();
--
-- Once upon a time we didn't clean-up properly after erroring out. Here the first
-- statement errors, but the second succeeds. In old times, because of failure in
-- clean-up, both rows were visible. But only the 2nd one should be visible.
--
insert into test_tr values(5);
insert into test_tr values(6);
SELECT * FROM test_tr ORDER BY i;
drop table test_tr;
--
-- https://github.com/citusdata/cstore2/issues/32
--
create table events(
user_id bigint,
event_id bigint,
event_time timestamp default now(),
value float default random())
PARTITION BY RANGE (event_time);
create table events_p2020_11_04_102965
PARTITION OF events FOR VALUES FROM ('2020-11-04 00:00:00+01') TO ('2020-11-05 00:00:00+01')
USING cstore_tableam;
create table events_trigger_target(
user_id bigint,
avg float,
__count__ bigint
) USING cstore_tableam;
CREATE OR REPLACE FUNCTION user_value_by_day()
RETURNS trigger
LANGUAGE plpgsql
AS $function$
BEGIN
IF (TG_OP = 'INSERT' OR TG_OP = 'UPDATE') THEN
EXECUTE format($exec_format$INSERT INTO %s AS __mat__ SELECT user_id, 0.1 AS avg, pg_catalog.count(*) AS __count__ FROM __ins__ events GROUP BY user_id;
$exec_format$, TG_ARGV[0]);
END IF;
IF (TG_OP = 'DELETE' OR TG_OP = 'UPDATE') THEN
RAISE EXCEPTION $ex$MATERIALIZED VIEW 'user_value_by_day' on table 'events' does not support UPDATE/DELETE$ex$;
END IF;
IF (TG_OP = 'TRUNCATE') THEN
EXECUTE format($exec_format$TRUNCATE TABLE %s; $exec_format$, TG_ARGV[0]);
END IF;
RETURN NULL;
END;
$function$;
create trigger "user_value_by_day_INSERT" AFTER INSERT ON events
REFERENCING NEW TABLE AS __ins__
FOR EACH STATEMENT EXECUTE FUNCTION user_value_by_day('events_trigger_target');
COPY events FROM STDIN WITH (FORMAT 'csv');
1,1,"2020-11-04 15:54:02.226999-08",1.1
2,3,"2020-11-04 16:54:02.226999-08",2.2
\.
SELECT * FROM events ORDER BY user_id;
SELECT * FROM events_trigger_target ORDER BY user_id;
DROP TABLE events;