Merge pull request #4319 from citusdata/cstore_write_state_management

Implements write state management for tuple inserts.
pull/4322/head
Hadi Moshayedi 2020-11-17 12:17:51 -08:00 committed by GitHub
commit 3088ccd62a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 1354 additions and 140 deletions

View File

@ -549,7 +549,7 @@ CopyIntoCStoreTable(const CopyStmt *copyStatement, const char *queryString)
#endif
/* init state to write to the cstore file */
writeState = CStoreBeginWrite(relation,
writeState = CStoreBeginWrite(relation->rd_node,
cstoreOptions->compressionType,
cstoreOptions->stripeRowCount,
cstoreOptions->blockRowCount,
@ -1992,13 +1992,16 @@ CStoreBeginForeignInsert(ModifyTableState *modifyTableState, ResultRelInfo *rela
CStoreOptions *cstoreOptions = CStoreGetOptions(foreignTableOid);
TupleDesc tupleDescriptor = RelationGetDescr(relationInfo->ri_RelationDesc);
TableWriteState *writeState = CStoreBeginWrite(relation,
TableWriteState *writeState = CStoreBeginWrite(relation->rd_node,
cstoreOptions->compressionType,
cstoreOptions->stripeRowCount,
cstoreOptions->blockRowCount,
tupleDescriptor);
relationInfo->ri_FdwState = (void *) writeState;
/* keep the lock */
relation_close(relation, NoLock);
}
@ -2055,10 +2058,7 @@ CStoreEndForeignInsert(EState *executorState, ResultRelInfo *relationInfo)
/* writeState is NULL during Explain queries */
if (writeState != NULL)
{
Relation relation = writeState->relation;
CStoreEndWrite(writeState);
heap_close(relation, RowExclusiveLock);
}
}

View File

@ -134,18 +134,19 @@ InitCStoreDataFileMetadata(Oid relfilenode, int blockRowCount, int stripeRowCoun
{
NameData compressionName = { 0 };
namestrcpy(&compressionName, CompressionTypeStr(compression));
bool nulls[Natts_cstore_data_files] = { 0 };
Datum values[Natts_cstore_data_files] = {
ObjectIdGetDatum(relfilenode),
Int32GetDatum(blockRowCount),
Int32GetDatum(stripeRowCount),
NameGetDatum(&compressionName),
0, /* to be filled below */
Int32GetDatum(CSTORE_VERSION_MAJOR),
Int32GetDatum(CSTORE_VERSION_MINOR)
};
namestrcpy(&compressionName, CompressionTypeStr(compression));
values[Anum_cstore_data_files_compression - 1] = NameGetDatum(&compressionName);
DeleteDataFileMetadataRowIfExists(relfilenode);
Oid cstoreDataFilesOid = CStoreDataFilesRelationId();
@ -171,6 +172,7 @@ UpdateCStoreDataFileMetadata(Oid relfilenode, int blockRowCount, int stripeRowCo
Datum values[Natts_cstore_data_files] = { 0 };
bool isnull[Natts_cstore_data_files] = { 0 };
bool replace[Natts_cstore_data_files] = { 0 };
bool changed = false;
Relation cstoreDataFiles = heap_open(CStoreDataFilesRelationId(), RowExclusiveLock);
TupleDesc tupleDescriptor = RelationGetDescr(cstoreDataFiles);
@ -192,7 +194,6 @@ UpdateCStoreDataFileMetadata(Oid relfilenode, int blockRowCount, int stripeRowCo
Form_cstore_data_files metadata = (Form_cstore_data_files) GETSTRUCT(heapTuple);
bool changed = false;
if (metadata->block_row_count != blockRowCount)
{
values[Anum_cstore_data_files_block_row_count - 1] = Int32GetDatum(blockRowCount);

View File

@ -68,10 +68,7 @@ typedef struct CStoreScanDescData
typedef struct CStoreScanDescData *CStoreScanDesc;
static TableWriteState *CStoreWriteState = NULL;
static ExecutorEnd_hook_type PreviousExecutorEndHook = NULL;
static MemoryContext CStoreContext = NULL;
static object_access_hook_type prevObjectAccessHook = NULL;
static object_access_hook_type PrevObjectAccessHook = NULL;
static ProcessUtility_hook_type PreviousProcessUtilityHook = NULL;
/* forward declaration for static functions */
@ -122,13 +119,13 @@ CStoreTableAMDefaultOptions()
* CStoreTableAMGetOptions returns the options based on a relation. It is advised the
* relation is a cstore table am table, if not it will raise an error
*/
static CStoreOptions *
CStoreTableAMGetOptions(Relation rel)
CStoreOptions *
CStoreTableAMGetOptions(Oid relfilenode)
{
Assert(rel != NULL);
Assert(OidIsValid(relfilenode));
CStoreOptions *cstoreOptions = palloc0(sizeof(CStoreOptions));
DataFileMetadata *metadata = ReadDataFileMetadata(rel->rd_node.relNode, false);
DataFileMetadata *metadata = ReadDataFileMetadata(relfilenode, false);
cstoreOptions->compressionType = metadata->compression;
cstoreOptions->stripeRowCount = metadata->stripeRowCount;
cstoreOptions->blockRowCount = metadata->blockRowCount;
@ -136,66 +133,6 @@ CStoreTableAMGetOptions(Relation rel)
}
static MemoryContext
GetCStoreMemoryContext()
{
if (CStoreContext == NULL)
{
CStoreContext = AllocSetContextCreate(TopMemoryContext, "cstore context",
ALLOCSET_DEFAULT_SIZES);
}
return CStoreContext;
}
static void
ResetCStoreMemoryContext()
{
if (CStoreContext != NULL)
{
MemoryContextReset(CStoreContext);
}
}
static void
cstore_init_write_state(Relation relation)
{
if (CStoreWriteState != NULL)
{
/* TODO: consider whether it's possible for a new write to start */
/* before an old one is flushed */
Assert(CStoreWriteState->relation->rd_id == relation->rd_id);
}
if (CStoreWriteState == NULL)
{
CStoreOptions *cstoreOptions = CStoreTableAMGetOptions(relation);
TupleDesc tupdesc = RelationGetDescr(relation);
elog(LOG, "initializing write state for relation %d", relation->rd_id);
CStoreWriteState = CStoreBeginWrite(relation,
cstoreOptions->compressionType,
cstoreOptions->stripeRowCount,
cstoreOptions->blockRowCount,
tupdesc);
}
}
static void
cstore_free_write_state()
{
if (CStoreWriteState != NULL)
{
elog(LOG, "flushing write state for relation %d",
CStoreWriteState->relation->rd_id);
CStoreEndWrite(CStoreWriteState);
CStoreWriteState = NULL;
}
}
static List *
RelationColumnList(Relation rel)
{
@ -263,9 +200,10 @@ cstore_beginscan_extended(Relation relation, Snapshot snapshot,
uint32 flags, Bitmapset *attr_needed, List *scanQual)
{
TupleDesc tupdesc = relation->rd_att;
Oid relfilenode = relation->rd_node.relNode;
CStoreScanDesc scan = palloc(sizeof(CStoreScanDescData));
List *neededColumnList = NIL;
MemoryContext oldContext = MemoryContextSwitchTo(GetCStoreMemoryContext());
MemoryContext oldContext = MemoryContextSwitchTo(TopTransactionContext);
ListCell *columnCell = NULL;
scan->cs_base.rs_rd = relation;
@ -275,6 +213,15 @@ cstore_beginscan_extended(Relation relation, Snapshot snapshot,
scan->cs_base.rs_flags = flags;
scan->cs_base.rs_parallel = parallel_scan;
if (PendingWritesInUpperTransactions(relfilenode, GetCurrentSubTransactionId()))
{
elog(ERROR,
"cannot read from table when there is unflushed data in upper transactions");
}
FlushWriteStateForRelfilenode(relfilenode, GetCurrentSubTransactionId());
List *columnList = RelationColumnList(relation);
/* only collect columns that we need for the scan */
@ -319,7 +266,7 @@ static bool
cstore_getnextslot(TableScanDesc sscan, ScanDirection direction, TupleTableSlot *slot)
{
CStoreScanDesc scan = (CStoreScanDesc) sscan;
MemoryContext oldContext = MemoryContextSwitchTo(GetCStoreMemoryContext());
MemoryContext oldContext = MemoryContextSwitchTo(TopTransactionContext);
ExecClearTuple(slot);
@ -437,9 +384,9 @@ static void
cstore_tuple_insert(Relation relation, TupleTableSlot *slot, CommandId cid,
int options, BulkInsertState bistate)
{
MemoryContext oldContext = MemoryContextSwitchTo(GetCStoreMemoryContext());
cstore_init_write_state(relation);
TableWriteState *writeState = cstore_init_write_state(relation->rd_node,
RelationGetDescr(relation),
GetCurrentSubTransactionId());
HeapTuple heapTuple = ExecCopySlotHeapTuple(slot);
if (HeapTupleHasExternal(heapTuple))
@ -453,8 +400,7 @@ cstore_tuple_insert(Relation relation, TupleTableSlot *slot, CommandId cid,
slot_getallattrs(slot);
CStoreWriteRow(CStoreWriteState, slot->tts_values, slot->tts_isnull);
MemoryContextSwitchTo(oldContext);
CStoreWriteRow(writeState, slot->tts_values, slot->tts_isnull);
}
@ -479,9 +425,9 @@ static void
cstore_multi_insert(Relation relation, TupleTableSlot **slots, int ntuples,
CommandId cid, int options, BulkInsertState bistate)
{
MemoryContext oldContext = MemoryContextSwitchTo(GetCStoreMemoryContext());
cstore_init_write_state(relation);
TableWriteState *writeState = cstore_init_write_state(relation->rd_node,
RelationGetDescr(relation),
GetCurrentSubTransactionId());
for (int i = 0; i < ntuples; i++)
{
@ -499,9 +445,8 @@ cstore_multi_insert(Relation relation, TupleTableSlot **slots, int ntuples,
slot_getallattrs(tupleSlot);
CStoreWriteRow(CStoreWriteState, tupleSlot->tts_values, tupleSlot->tts_isnull);
CStoreWriteRow(writeState, tupleSlot->tts_values, tupleSlot->tts_isnull);
}
MemoryContextSwitchTo(oldContext);
}
@ -537,11 +482,9 @@ cstore_tuple_lock(Relation relation, ItemPointer tid, Snapshot snapshot,
static void
cstore_finish_bulk_insert(Relation relation, int options)
{
/*TODO: flush relation like for heap? */
/* free write state or only in ExecutorEnd_hook? */
/* for COPY */
cstore_free_write_state();
/*
* Nothing to do here. We keep write states live until transaction end.
*/
}
@ -556,6 +499,9 @@ cstore_relation_set_new_filenode(Relation rel,
uint64 blockRowCount = 0;
uint64 stripeRowCount = 0;
CompressionType compression = 0;
Oid oldRelfilenode = rel->rd_node.relNode;
MarkRelfilenodeDropped(oldRelfilenode, GetCurrentSubTransactionId());
if (metadata != NULL)
{
@ -589,7 +535,10 @@ cstore_relation_set_new_filenode(Relation rel,
static void
cstore_relation_nontransactional_truncate(Relation rel)
{
DataFileMetadata *metadata = ReadDataFileMetadata(rel->rd_node.relNode, false);
Oid relfilenode = rel->rd_node.relNode;
DataFileMetadata *metadata = ReadDataFileMetadata(relfilenode, false);
NonTransactionDropWriteState(relfilenode);
/*
* No need to set new relfilenode, since the table was created in this
@ -651,23 +600,23 @@ cstore_relation_copy_for_cluster(Relation OldHeap, Relation NewHeap,
* relation first.
*/
CStoreOptions *cstoreOptions = CStoreTableAMGetOptions(OldHeap);
CStoreOptions *cstoreOptions = CStoreTableAMGetOptions(OldHeap->rd_node.relNode);
UpdateCStoreDataFileMetadata(NewHeap->rd_node.relNode,
cstoreOptions->blockRowCount,
cstoreOptions->stripeRowCount,
cstoreOptions->compressionType);
cstoreOptions = CStoreTableAMGetOptions(NewHeap);
cstoreOptions = CStoreTableAMGetOptions(NewHeap->rd_node.relNode);
TableWriteState *writeState = CStoreBeginWrite(NewHeap,
TableWriteState *writeState = CStoreBeginWrite(NewHeap->rd_node,
cstoreOptions->compressionType,
cstoreOptions->stripeRowCount,
cstoreOptions->blockRowCount,
targetDesc);
TableReadState *readState = CStoreBeginRead(OldHeap, sourceDesc, RelationColumnList(
OldHeap), NULL);
TableReadState *readState = CStoreBeginRead(OldHeap, sourceDesc,
RelationColumnList(OldHeap), NULL);
Datum *values = palloc0(sourceDesc->natts * sizeof(Datum));
bool *nulls = palloc0(sourceDesc->natts * sizeof(bool));
@ -1046,18 +995,61 @@ cstore_scan_sample_next_tuple(TableScanDesc scan, SampleScanState *scanstate,
static void
CStoreExecutorEnd(QueryDesc *queryDesc)
CStoreXactCallback(XactEvent event, void *arg)
{
cstore_free_write_state();
if (PreviousExecutorEndHook)
switch (event)
{
PreviousExecutorEndHook(queryDesc);
case XACT_EVENT_COMMIT:
case XACT_EVENT_PARALLEL_COMMIT:
case XACT_EVENT_PREPARE:
{
/* nothing to do */
break;
}
case XACT_EVENT_ABORT:
case XACT_EVENT_PARALLEL_ABORT:
{
DiscardWriteStateForAllRels(GetCurrentSubTransactionId(), 0);
break;
}
case XACT_EVENT_PRE_COMMIT:
case XACT_EVENT_PARALLEL_PRE_COMMIT:
case XACT_EVENT_PRE_PREPARE:
{
FlushWriteStateForAllRels(GetCurrentSubTransactionId(), 0);
break;
}
}
else
}
static void
CStoreSubXactCallback(SubXactEvent event, SubTransactionId mySubid,
SubTransactionId parentSubid, void *arg)
{
switch (event)
{
standard_ExecutorEnd(queryDesc);
case SUBXACT_EVENT_START_SUB:
case SUBXACT_EVENT_COMMIT_SUB:
{
/* nothing to do */
break;
}
case SUBXACT_EVENT_ABORT_SUB:
{
DiscardWriteStateForAllRels(mySubid, parentSubid);
break;
}
case SUBXACT_EVENT_PRE_COMMIT_SUB:
{
FlushWriteStateForAllRels(mySubid, parentSubid);
break;
}
}
ResetCStoreMemoryContext();
}
@ -1109,12 +1101,13 @@ CStoreTableAMProcessUtility(PlannedStmt * plannedStatement,
void
cstore_tableam_init()
{
PreviousExecutorEndHook = ExecutorEnd_hook;
ExecutorEnd_hook = CStoreExecutorEnd;
RegisterXactCallback(CStoreXactCallback, NULL);
RegisterSubXactCallback(CStoreSubXactCallback, NULL);
PreviousProcessUtilityHook = (ProcessUtility_hook != NULL) ?
ProcessUtility_hook : standard_ProcessUtility;
ProcessUtility_hook = CStoreTableAMProcessUtility;
prevObjectAccessHook = object_access_hook;
PrevObjectAccessHook = object_access_hook;
object_access_hook = CStoreTableAMObjectAccessHook;
cstore_customscan_init();
@ -1124,7 +1117,7 @@ cstore_tableam_init()
void
cstore_tableam_finish()
{
ExecutorEnd_hook = PreviousExecutorEndHook;
object_access_hook = PrevObjectAccessHook;
}
@ -1140,9 +1133,9 @@ CStoreTableAMObjectAccessHook(ObjectAccessType access, Oid classId, Oid objectId
subId,
void *arg)
{
if (prevObjectAccessHook)
if (PrevObjectAccessHook)
{
prevObjectAccessHook(access, classId, objectId, subId, arg);
PrevObjectAccessHook(access, classId, objectId, subId, arg);
}
/*
@ -1166,7 +1159,10 @@ CStoreTableAMObjectAccessHook(ObjectAccessType access, Oid classId, Oid objectId
* tableam tables storage is managed by postgres.
*/
Relation rel = table_open(objectId, AccessExclusiveLock);
DeleteDataFileMetadataRowIfExists(rel->rd_node.relNode);
Oid relfilenode = rel->rd_node.relNode;
DeleteDataFileMetadataRowIfExists(relfilenode);
MarkRelfilenodeDropped(relfilenode, GetCurrentSubTransactionId());
/* keep the lock since we did physical changes to the relation */
table_close(rel, NoLock);

View File

@ -18,6 +18,7 @@
#include "safe_lib.h"
#include "access/heapam.h"
#include "access/nbtree.h"
#include "catalog/pg_am.h"
#include "miscadmin.h"
@ -25,6 +26,7 @@
#include "storage/smgr.h"
#include "utils/memutils.h"
#include "utils/rel.h"
#include "utils/relfilenodemap.h"
#include "columnar/cstore.h"
#include "columnar/cstore_version_compat.h"
@ -58,7 +60,7 @@ static StringInfo CopyStringInfo(StringInfo sourceString);
* will be added.
*/
TableWriteState *
CStoreBeginWrite(Relation relation,
CStoreBeginWrite(RelFileNode relfilenode,
CompressionType compressionType,
uint64 stripeMaxRowCount, uint32 blockRowCount,
TupleDesc tupleDescriptor)
@ -101,11 +103,11 @@ CStoreBeginWrite(Relation relation,
blockRowCount);
TableWriteState *writeState = palloc0(sizeof(TableWriteState));
writeState->relation = relation;
writeState->relfilenode = relfilenode;
writeState->compressionType = compressionType;
writeState->stripeMaxRowCount = stripeMaxRowCount;
writeState->blockRowCount = blockRowCount;
writeState->tupleDescriptor = tupleDescriptor;
writeState->tupleDescriptor = CreateTupleDescCopy(tupleDescriptor);
writeState->comparisonFunctionArray = comparisonFunctionArray;
writeState->stripeBuffers = NULL;
writeState->stripeSkipList = NULL;
@ -205,11 +207,7 @@ CStoreWriteRow(TableWriteState *writeState, Datum *columnValues, bool *columnNul
stripeBuffers->rowCount++;
if (stripeBuffers->rowCount >= writeState->stripeMaxRowCount)
{
FlushStripe(writeState);
/* set stripe data and skip list to NULL so they are recreated next time */
writeState->stripeBuffers = NULL;
writeState->stripeSkipList = NULL;
CStoreFlushPendingWrites(writeState);
}
MemoryContextSwitchTo(oldContext);
@ -225,17 +223,7 @@ CStoreWriteRow(TableWriteState *writeState, Datum *columnValues, bool *columnNul
void
CStoreEndWrite(TableWriteState *writeState)
{
StripeBuffers *stripeBuffers = writeState->stripeBuffers;
if (stripeBuffers != NULL)
{
MemoryContext oldContext = MemoryContextSwitchTo(writeState->stripeWriteContext);
FlushStripe(writeState);
MemoryContextReset(writeState->stripeWriteContext);
MemoryContextSwitchTo(oldContext);
}
CStoreFlushPendingWrites(writeState);
MemoryContextDelete(writeState->stripeWriteContext);
pfree(writeState->comparisonFunctionArray);
@ -244,6 +232,25 @@ CStoreEndWrite(TableWriteState *writeState)
}
void
CStoreFlushPendingWrites(TableWriteState *writeState)
{
StripeBuffers *stripeBuffers = writeState->stripeBuffers;
if (stripeBuffers != NULL)
{
MemoryContext oldContext = MemoryContextSwitchTo(writeState->stripeWriteContext);
FlushStripe(writeState);
/* set stripe data and skip list to NULL so they are recreated next time */
writeState->stripeBuffers = NULL;
writeState->stripeSkipList = NULL;
MemoryContextSwitchTo(oldContext);
}
}
/*
* CreateEmptyStripeBuffers allocates an empty StripeBuffers structure with the given
* column count.
@ -410,6 +417,10 @@ FlushStripe(TableWriteState *writeState)
uint64 stripeSize = 0;
uint64 stripeRowCount = 0;
Oid relationId = RelidByRelfilenode(writeState->relfilenode.spcNode,
writeState->relfilenode.relNode);
Relation relation = relation_open(relationId, NoLock);
/*
* check if the last block needs serialization , the last block was not serialized
* if it was not full yet, e.g. (rowCount > 0)
@ -459,7 +470,7 @@ FlushStripe(TableWriteState *writeState)
stripeSkipList->blockSkipNodeArray[0][blockIndex].rowCount;
}
stripeMetadata = ReserveStripe(writeState->relation, stripeSize,
stripeMetadata = ReserveStripe(relation, stripeSize,
stripeRowCount, columnCount, blockCount,
blockRowCount);
@ -486,7 +497,7 @@ FlushStripe(TableWriteState *writeState)
columnBuffers->blockBuffersArray[blockIndex];
StringInfo existsBuffer = blockBuffers->existsBuffer;
WriteToSmgr(writeState->relation, currentFileOffset,
WriteToSmgr(relation, currentFileOffset,
existsBuffer->data, existsBuffer->len);
currentFileOffset += existsBuffer->len;
}
@ -497,16 +508,18 @@ FlushStripe(TableWriteState *writeState)
columnBuffers->blockBuffersArray[blockIndex];
StringInfo valueBuffer = blockBuffers->valueBuffer;
WriteToSmgr(writeState->relation, currentFileOffset,
WriteToSmgr(relation, currentFileOffset,
valueBuffer->data, valueBuffer->len);
currentFileOffset += valueBuffer->len;
}
}
/* create skip list and footer buffers */
SaveStripeSkipList(writeState->relation->rd_node.relNode,
SaveStripeSkipList(relation->rd_node.relNode,
stripeMetadata.id,
stripeSkipList, tupleDescriptor);
relation_close(relation, NoLock);
}
@ -747,3 +760,10 @@ CopyStringInfo(StringInfo sourceString)
return targetString;
}
bool
ContainsPendingWrites(TableWriteState *state)
{
return state->stripeBuffers != NULL && state->stripeBuffers->rowCount != 0;
}

View File

@ -0,0 +1,384 @@
#include "citus_version.h"
#if HAS_TABLEAM
#include "postgres.h"
#include <math.h>
#include "miscadmin.h"
#include "access/genam.h"
#include "access/heapam.h"
#include "access/multixact.h"
#include "access/rewriteheap.h"
#include "access/tsmapi.h"
#if PG_VERSION_NUM >= 130000
#include "access/heaptoast.h"
#else
#include "access/tuptoaster.h"
#endif
#include "access/xact.h"
#include "catalog/catalog.h"
#include "catalog/index.h"
#include "catalog/objectaccess.h"
#include "catalog/pg_am.h"
#include "catalog/pg_trigger.h"
#include "catalog/storage.h"
#include "catalog/storage_xlog.h"
#include "commands/progress.h"
#include "commands/vacuum.h"
#include "executor/executor.h"
#include "nodes/makefuncs.h"
#include "optimizer/plancat.h"
#include "pgstat.h"
#include "storage/bufmgr.h"
#include "storage/bufpage.h"
#include "storage/bufmgr.h"
#include "storage/lmgr.h"
#include "storage/predicate.h"
#include "storage/procarray.h"
#include "storage/smgr.h"
#include "tcop/utility.h"
#include "utils/builtins.h"
#include "utils/pg_rusage.h"
#include "utils/rel.h"
#include "utils/syscache.h"
#include "columnar/cstore.h"
#include "columnar/cstore_customscan.h"
#include "columnar/cstore_tableam.h"
#include "columnar/cstore_version_compat.h"
/*
* Mapping from relfilenode to WriteStateMapEntry. This keeps write state for
* each relation.
*/
static HTAB *WriteStateMap = NULL;
/* memory context for allocating WriteStateMap & all write states */
static MemoryContext WriteStateContext = NULL;
/*
* Each member of the writeStateStack in WriteStateMapEntry. This means that
* we did some inserts in the subtransaction subXid, and the state of those
* inserts is stored at writeState. Those writes can be flushed or unflushed.
*/
typedef struct SubXidWriteState
{
SubTransactionId subXid;
TableWriteState *writeState;
struct SubXidWriteState *next;
} SubXidWriteState;
/*
* An entry in WriteStateMap.
*/
typedef struct WriteStateMapEntry
{
/* key of the entry */
Oid relfilenode;
/*
* If a table is dropped, we set dropped to true and set dropSubXid to the
* id of the subtransaction in which the drop happened.
*/
bool dropped;
SubTransactionId dropSubXid;
/*
* Stack of SubXidWriteState where first element is top of the stack. When
* inserts happen, we look at top of the stack. If top of stack belongs to
* current subtransaction, we forward writes to its writeState. Otherwise,
* we create a new stack entry for current subtransaction and push it to
* the stack, and forward writes to that.
*/
SubXidWriteState *writeStateStack;
} WriteStateMapEntry;
/*
* Memory context reset callback so we reset WriteStateMap to NULL at the end
* of transaction. WriteStateMap is allocated in & WriteStateMap, so its
* leaked reference can cause memory issues.
*/
static MemoryContextCallback cleanupCallback;
static void
CleanupWriteStateMap(void *arg)
{
WriteStateMap = NULL;
WriteStateContext = NULL;
}
TableWriteState *
cstore_init_write_state(RelFileNode relfilenode, TupleDesc tupdesc,
SubTransactionId currentSubXid)
{
bool found;
/*
* If this is the first call in current transaction, allocate the hash
* table.
*/
if (WriteStateMap == NULL)
{
WriteStateContext =
AllocSetContextCreate(
TopTransactionContext,
"Column Store Write State Management Context",
ALLOCSET_DEFAULT_SIZES);
HASHCTL info;
uint32 hashFlags = (HASH_ELEM | HASH_CONTEXT);
memset(&info, 0, sizeof(info));
info.keysize = sizeof(Oid);
info.entrysize = sizeof(WriteStateMapEntry);
info.hcxt = WriteStateContext;
WriteStateMap = hash_create("column store write state map",
64, &info, hashFlags);
cleanupCallback.arg = NULL;
cleanupCallback.func = &CleanupWriteStateMap;
cleanupCallback.next = NULL;
MemoryContextRegisterResetCallback(WriteStateContext, &cleanupCallback);
}
WriteStateMapEntry *hashEntry = hash_search(WriteStateMap, &relfilenode.relNode,
HASH_ENTER, &found);
if (!found)
{
hashEntry->writeStateStack = NULL;
hashEntry->dropped = false;
}
Assert(!hashEntry->dropped);
/*
* If top of stack belongs to the current subtransaction, return its
* writeState, ...
*/
if (hashEntry->writeStateStack != NULL)
{
SubXidWriteState *stackHead = hashEntry->writeStateStack;
if (stackHead->subXid == currentSubXid)
{
return stackHead->writeState;
}
}
/*
* ... otherwise we need to create a new stack entry for the current
* subtransaction.
*/
MemoryContext oldContext = MemoryContextSwitchTo(WriteStateContext);
CStoreOptions *cstoreOptions = CStoreTableAMGetOptions(relfilenode.relNode);
SubXidWriteState *stackEntry = palloc0(sizeof(SubXidWriteState));
stackEntry->writeState = CStoreBeginWrite(relfilenode,
cstoreOptions->compressionType,
cstoreOptions->stripeRowCount,
cstoreOptions->blockRowCount,
tupdesc);
stackEntry->subXid = currentSubXid;
stackEntry->next = hashEntry->writeStateStack;
hashEntry->writeStateStack = stackEntry;
MemoryContextSwitchTo(oldContext);
return stackEntry->writeState;
}
/*
* Flushes pending writes for given relfilenode in the given subtransaction.
*/
void
FlushWriteStateForRelfilenode(Oid relfilenode, SubTransactionId currentSubXid)
{
WriteStateMapEntry *entry;
bool found = false;
if (WriteStateMap)
{
entry = hash_search(WriteStateMap, &relfilenode, HASH_FIND, &found);
}
Assert(!found || !entry->dropped);
if (found && entry->writeStateStack != NULL)
{
SubXidWriteState *stackEntry = entry->writeStateStack;
if (stackEntry->subXid == currentSubXid)
{
CStoreFlushPendingWrites(stackEntry->writeState);
}
}
}
/*
* Helper function for FlushWriteStateForAllRels and DiscardWriteStateForAllRels.
* Pops all of write states for current subtransaction, and depending on "commit"
* either flushes them or discards them. This also takes into account dropped
* tables, and either propagates the dropped flag to parent subtransaction or
* rolls back abort.
*/
static void
PopWriteStateForAllRels(SubTransactionId currentSubXid, SubTransactionId parentSubXid,
bool commit)
{
HASH_SEQ_STATUS status;
WriteStateMapEntry *entry;
if (WriteStateMap == NULL)
{
return;
}
hash_seq_init(&status, WriteStateMap);
while ((entry = hash_seq_search(&status)) != 0)
{
if (entry->writeStateStack == NULL)
{
continue;
}
/*
* If the table has been dropped in current subtransaction, either
* commit the drop or roll it back.
*/
if (entry->dropped)
{
if (entry->dropSubXid == currentSubXid)
{
if (commit)
{
/* elevate drop to the upper subtransaction */
entry->dropSubXid = parentSubXid;
}
else
{
/* abort the drop */
entry->dropped = false;
}
}
}
/*
* Otherwise, commit or discard pending writes.
*/
else
{
SubXidWriteState *stackHead = entry->writeStateStack;
if (stackHead->subXid == currentSubXid)
{
if (commit)
{
CStoreEndWrite(stackHead->writeState);
}
entry->writeStateStack = stackHead->next;
}
}
}
}
/*
* Called when current subtransaction is committed.
*/
void
FlushWriteStateForAllRels(SubTransactionId currentSubXid, SubTransactionId parentSubXid)
{
PopWriteStateForAllRels(currentSubXid, parentSubXid, true);
}
/*
* Called when current subtransaction is aborted.
*/
void
DiscardWriteStateForAllRels(SubTransactionId currentSubXid, SubTransactionId parentSubXid)
{
PopWriteStateForAllRels(currentSubXid, parentSubXid, false);
}
/*
* Called when the given relfilenode is dropped.
*/
void
MarkRelfilenodeDropped(Oid relfilenode, SubTransactionId currentSubXid)
{
bool found = false;
if (WriteStateMap == NULL)
{
return;
}
WriteStateMapEntry *entry = hash_search(WriteStateMap, &relfilenode, HASH_FIND,
&found);
if (!found || entry->dropped)
{
return;
}
entry->dropped = true;
entry->dropSubXid = currentSubXid;
}
/*
* Called when the given relfilenode is dropped in non-transactional TRUNCATE.
*/
void
NonTransactionDropWriteState(Oid relfilenode)
{
if (WriteStateMap)
{
hash_search(WriteStateMap, &relfilenode, HASH_REMOVE, false);
}
}
/*
* Returns true if there are any pending writes in upper transactions.
*/
bool
PendingWritesInUpperTransactions(Oid relfilenode, SubTransactionId currentSubXid)
{
WriteStateMapEntry *entry;
bool found = false;
if (WriteStateMap)
{
entry = hash_search(WriteStateMap, &relfilenode, HASH_FIND, &found);
}
if (found && entry->writeStateStack != NULL)
{
SubXidWriteState *stackEntry = entry->writeStateStack;
while (stackEntry != NULL)
{
if (stackEntry->subXid != currentSubXid &&
ContainsPendingWrites(stackEntry->writeState))
{
return true;
}
stackEntry = stackEntry->next;
}
}
return false;
}
#endif

View File

@ -20,6 +20,7 @@
#include "nodes/parsenodes.h"
#include "storage/bufpage.h"
#include "storage/lockdefs.h"
#include "storage/relfilenode.h"
#include "utils/relcache.h"
#include "utils/snapmgr.h"
@ -224,7 +225,7 @@ typedef struct TableWriteState
CompressionType compressionType;
TupleDesc tupleDescriptor;
FmgrInfo **comparisonFunctionArray;
Relation relation;
RelFileNode relfilenode;
MemoryContext stripeWriteContext;
StripeBuffers *stripeBuffers;
@ -251,14 +252,16 @@ extern void cstore_init(void);
extern CompressionType ParseCompressionType(const char *compressionTypeString);
/* Function declarations for writing to a cstore file */
extern TableWriteState * CStoreBeginWrite(Relation relation,
extern TableWriteState * CStoreBeginWrite(RelFileNode relfilenode,
CompressionType compressionType,
uint64 stripeMaxRowCount,
uint32 blockRowCount,
TupleDesc tupleDescriptor);
extern void CStoreWriteRow(TableWriteState *state, Datum *columnValues,
bool *columnNulls);
extern void CStoreFlushPendingWrites(TableWriteState *state);
extern void CStoreEndWrite(TableWriteState *state);
extern bool ContainsPendingWrites(TableWriteState *state);
/* Function declarations for reading from a cstore file */
extern TableReadState * CStoreBeginRead(Relation relation,
@ -281,6 +284,7 @@ extern bool CompressBuffer(StringInfo inputBuffer, StringInfo outputBuffer,
CompressionType compressionType);
extern StringInfo DecompressBuffer(StringInfo buffer, CompressionType compressionType);
extern char * CompressionTypeStr(CompressionType type);
extern CStoreOptions * CStoreTableAMGetOptions(Oid relfilenode);
/* cstore_metadata_tables.c */
extern void DeleteDataFileMetadataRowIfExists(Oid relfilenode);
@ -300,6 +304,22 @@ extern StripeSkipList * ReadStripeSkipList(Oid relfilenode, uint64 stripe,
TupleDesc tupleDescriptor,
uint32 blockCount);
/* write_state_management.c */
extern TableWriteState * cstore_init_write_state(RelFileNode relfilenode, TupleDesc
tupdesc,
SubTransactionId currentSubXid);
extern void FlushWriteStateForRelfilenode(Oid relfilenode, SubTransactionId
currentSubXid);
extern void FlushWriteStateForAllRels(SubTransactionId currentSubXid, SubTransactionId
parentSubXid);
extern void DiscardWriteStateForAllRels(SubTransactionId currentSubXid, SubTransactionId
parentSubXid);
extern void MarkRelfilenodeDropped(Oid relfilenode, SubTransactionId currentSubXid);
extern void NonTransactionDropWriteState(Oid relfilenode);
extern bool PendingWritesInUpperTransactions(Oid relfilenode,
SubTransactionId currentSubXid);
typedef struct SmgrAddr
{
BlockNumber blockno;

View File

@ -16,3 +16,5 @@ test: am_block_filtering
test: am_join
test: am_trigger
test: am_tableoptions
test: am_recursive
test: am_transactions

View File

@ -0,0 +1,252 @@
CREATE TABLE t1(a int, b int) USING cstore_tableam;
CREATE TABLE t2(a int, b int) USING cstore_tableam;
CREATE FUNCTION f(x INT) RETURNS INT AS $$
INSERT INTO t1 VALUES(x, x * 2) RETURNING b - 1;
$$ LANGUAGE SQL;
--
-- Following query will start a write to t1 before finishing
-- write to t1, so it tests that we handle recursive writes
-- correctly.
--
INSERT INTO t2 SELECT i, f(i) FROM generate_series(1, 5) i;
-- there are no subtransactions, so above statement should batch
-- INSERTs inside the UDF and create on stripe per table.
SELECT relname, count(*) FROM cstore.cstore_stripes a, pg_class b
WHERE a.relfilenode=b.relfilenode AND relname IN ('t1', 't2')
GROUP BY relname
ORDER BY relname;
relname | count
---------------------------------------------------------------------
t1 | 1
t2 | 1
(2 rows)
SELECT * FROM t1 ORDER BY a;
a | b
---------------------------------------------------------------------
1 | 2
2 | 4
3 | 6
4 | 8
5 | 10
(5 rows)
SELECT * FROM t2 ORDER BY a;
a | b
---------------------------------------------------------------------
1 | 1
2 | 3
3 | 5
4 | 7
5 | 9
(5 rows)
TRUNCATE t1;
TRUNCATE t2;
DROP FUNCTION f(INT);
--
-- Test the case when 2 writes are going on concurrently in the
-- same executor, and those 2 writes are dependent.
--
WITH t AS (
INSERT INTO t1 SELECT i, 2*i FROM generate_series(1, 5) i RETURNING *
)
INSERT INTO t2 SELECT t.a, t.a+1 FROM t;
SELECT * FROM t1;
a | b
---------------------------------------------------------------------
1 | 2
2 | 4
3 | 6
4 | 8
5 | 10
(5 rows)
SELECT * FROM t2;
a | b
---------------------------------------------------------------------
1 | 2
2 | 3
3 | 4
4 | 5
5 | 6
(5 rows)
TRUNCATE t1;
TRUNCATE t2;
--
-- Test the case when there are 2 independent inserts in a CTE.
-- Also tests the case where some of the tuple_inserts happen in
-- ExecutorFinish() instead of ExecutorRun().
--
WITH t AS (
INSERT INTO t1 SELECT i, 2*i FROM generate_series(1, 5) i RETURNING *
)
INSERT INTO t2 SELECT i, (select count(*) from t1) FROM generate_series(1, 3) i;
SELECT * FROM t1;
a | b
---------------------------------------------------------------------
1 | 2
2 | 4
3 | 6
4 | 8
5 | 10
(5 rows)
SELECT * FROM t2;
a | b
---------------------------------------------------------------------
1 | 0
2 | 0
3 | 0
(3 rows)
TRUNCATE t1;
TRUNCATE t2;
--
-- double insert on the same relation
--
WITH t AS (
INSERT INTO t1 SELECT i, 2*i FROM generate_series(1, 5) i RETURNING *
)
INSERT INTO t1 SELECT t.a, t.a+1 FROM t;
SELECT * FROM t1 ORDER BY a, b;
a | b
---------------------------------------------------------------------
1 | 2
1 | 2
2 | 3
2 | 4
3 | 4
3 | 6
4 | 5
4 | 8
5 | 6
5 | 10
(10 rows)
TRUNCATE t1;
TRUNCATE t2;
--
-- A test where result of a UDF call will depend on execution
-- of previous UDF calls.
--
CREATE FUNCTION g(x INT) RETURNS INT AS $$
INSERT INTO t1 VALUES(x, x * 2);
SELECT count(*)::int FROM t1;
$$ LANGUAGE SQL;
-- t3 and t4 are heap tables to help with cross-checking results
CREATE TABLE t3(a int, b int);
CREATE TABLE t4(a int, b int);
CREATE FUNCTION g2(x INT) RETURNS INT AS $$
INSERT INTO t3 VALUES(x, x * 2);
SELECT count(*)::int FROM t3;
$$ LANGUAGE SQL;
INSERT INTO t2 SELECT i, g(i) FROM generate_series(1, 5) i;
INSERT INTO t4 SELECT i, g2(i) FROM generate_series(1, 5) i;
-- check that t1==t3 and t2==t4.
((table t1) except (table t3)) union ((table t3) except (table t1));
a | b
---------------------------------------------------------------------
(0 rows)
((table t2) except (table t4)) union ((table t4) except (table t2));
a | b
---------------------------------------------------------------------
(0 rows)
SELECT * FROM t2 ORDER BY a, b;
a | b
---------------------------------------------------------------------
1 | 1
2 | 2
3 | 3
4 | 4
5 | 5
(5 rows)
TRUNCATE t1, t2, t3, t4;
--
-- INSERT into the same relation that was INSERTed into in the UDF
--
INSERT INTO t1 SELECT i, g(i) FROM generate_series(1, 3) i;
INSERT INTO t3 SELECT i, g2(i) FROM generate_series(1, 3) i;
SELECT * FROM t1 ORDER BY a, b;
a | b
---------------------------------------------------------------------
1 | 1
1 | 2
2 | 3
2 | 4
3 | 5
3 | 6
(6 rows)
SELECT * FROM t3 ORDER BY a, b;
a | b
---------------------------------------------------------------------
1 | 1
1 | 2
2 | 3
2 | 4
3 | 5
3 | 6
(6 rows)
-- check that t1==t3 and t2==t4.
((table t1) except (table t3)) union ((table t3) except (table t1));
a | b
---------------------------------------------------------------------
(0 rows)
((table t2) except (table t4)) union ((table t4) except (table t2));
a | b
---------------------------------------------------------------------
(0 rows)
DROP FUNCTION g(int), g2(int);
TRUNCATE t1, t2, t3, t4;
--
-- EXCEPTION in plpgsql, which is implemented internally using
-- subtransactions. plgpsql uses SPI to execute INSERT statements.
--
CREATE FUNCTION f(a int) RETURNS VOID AS $$
DECLARE
x int;
BEGIN
INSERT INTO t1 SELECT i, i + 1 FROM generate_series(a, a + 1) i;
x := 10 / a;
INSERT INTO t1 SELECT i, i * 2 FROM generate_series(a + 2, a + 3) i;
EXCEPTION WHEN division_by_zero THEN
INSERT INTO t1 SELECT i, i + 1 FROM generate_series(a + 2, a + 3) i;
END;
$$ LANGUAGE plpgsql;
SELECT f(10);
f
---------------------------------------------------------------------
(1 row)
SELECT f(0), f(20);
f | f
---------------------------------------------------------------------
|
(1 row)
SELECT * FROM t1 ORDER BY a, b;
a | b
---------------------------------------------------------------------
2 | 3
3 | 4
10 | 11
11 | 12
12 | 24
13 | 26
20 | 21
21 | 22
22 | 44
23 | 46
(10 rows)
DROP FUNCTION f(int);
DROP TABLE t1, t2, t3, t4;

View File

@ -37,6 +37,12 @@ WHERE a.relfilenode = b.relfilenode AND b.relname = 't';
BEGIN;
SAVEPOINT s0;
INSERT INTO t SELECT i, i+1 FROM generate_series(1, 10) i;
SELECT count(*) FROM t; -- force flush
count
---------------------------------------------------------------------
20
(1 row)
SAVEPOINT s1;
INSERT INTO t SELECT i, i+1 FROM generate_series(1, 10) i;
SELECT count(*) FROM t;

View File

@ -0,0 +1,142 @@
--
-- Testing we handle transactions properly
--
CREATE TABLE t(a int, b int) USING cstore_tableam;
INSERT INTO t SELECT i, 2 * i FROM generate_series(1, 3) i;
SELECT * FROM t ORDER BY a;
a | b
---------------------------------------------------------------------
1 | 2
2 | 4
3 | 6
(3 rows)
-- verify that table rewrites work properly
BEGIN;
ALTER TABLE t ALTER COLUMN b TYPE float4 USING (b + 0.5)::float4;
INSERT INTO t VALUES (4, 8.5);
SELECT * FROM t ORDER BY a;
a | b
---------------------------------------------------------------------
1 | 2.5
2 | 4.5
3 | 6.5
4 | 8.5
(4 rows)
ROLLBACK;
SELECT * FROM t ORDER BY a;
a | b
---------------------------------------------------------------------
1 | 2
2 | 4
3 | 6
(3 rows)
-- verify truncate rollback
BEGIN;
TRUNCATE t;
INSERT INTO t VALUES (4, 8);
SELECT * FROM t ORDER BY a;
a | b
---------------------------------------------------------------------
4 | 8
(1 row)
SAVEPOINT s1;
TRUNCATE t;
SELECT * FROM t ORDER BY a;
a | b
---------------------------------------------------------------------
(0 rows)
ROLLBACK TO SAVEPOINT s1;
SELECT * FROM t ORDER BY a;
a | b
---------------------------------------------------------------------
4 | 8
(1 row)
ROLLBACK;
-- verify truncate with unflushed data in upper xacts
BEGIN;
INSERT INTO t VALUES (4, 8);
SAVEPOINT s1;
TRUNCATE t;
ROLLBACK TO SAVEPOINT s1;
COMMIT;
SELECT * FROM t ORDER BY a;
a | b
---------------------------------------------------------------------
1 | 2
2 | 4
3 | 6
4 | 8
(4 rows)
-- verify DROP TABLE rollback
BEGIN;
INSERT INTO t VALUES (5, 10);
SELECT * FROM t ORDER BY a;
a | b
---------------------------------------------------------------------
1 | 2
2 | 4
3 | 6
4 | 8
5 | 10
(5 rows)
SAVEPOINT s1;
DROP TABLE t;
SELECT * FROM t ORDER BY a;
ERROR: relation "t" does not exist
ROLLBACK TO SAVEPOINT s1;
SELECT * FROM t ORDER BY a;
a | b
---------------------------------------------------------------------
1 | 2
2 | 4
3 | 6
4 | 8
5 | 10
(5 rows)
ROLLBACK;
-- verify DROP TABLE with unflushed data in upper xacts
BEGIN;
INSERT INTO t VALUES (5, 10);
SAVEPOINT s1;
DROP TABLE t;
SELECT * FROM t ORDER BY a;
ERROR: relation "t" does not exist
ROLLBACK TO SAVEPOINT s1;
COMMIT;
SELECT * FROM t ORDER BY a;
a | b
---------------------------------------------------------------------
1 | 2
2 | 4
3 | 6
4 | 8
5 | 10
(5 rows)
-- verify SELECT when unflushed data in upper transactions errors.
BEGIN;
INSERT INTO t VALUES (6, 12);
SAVEPOINT s1;
SELECT * FROM t;
ERROR: cannot read from table when there is unflushed data in upper transactions
ROLLBACK;
SELECT * FROM t ORDER BY a;
a | b
---------------------------------------------------------------------
1 | 2
2 | 4
3 | 6
4 | 8
5 | 10
(5 rows)
DROP TABLE t;

View File

@ -74,4 +74,98 @@ NOTICE: (3)
CONTEXT: PL/pgSQL function trs_after() line 14 at RAISE
NOTICE: (4)
CONTEXT: PL/pgSQL function trs_after() line 14 at RAISE
SELECT * FROM test_tr ORDER BY i;
i
---------------------------------------------------------------------
1
2
3
4
(4 rows)
drop table test_tr;
create table test_tr(i int) using cstore_tableam;
-- we should be able to clean-up and continue gracefully if we
-- error out in AFTER STATEMENT triggers.
CREATE SEQUENCE counter START 100;
create or replace function trs_after_erroring() returns trigger language plpgsql as $$
BEGIN
IF nextval('counter') % 2 = 0 THEN
RAISE EXCEPTION '%', 'error';
END IF;
RETURN NULL;
END;
$$;
create trigger tr_after_stmt_erroring after insert on test_tr
referencing new table as new_table
for each statement execute procedure trs_after_erroring();
--
-- Once upon a time we didn't clean-up properly after erroring out. Here the first
-- statement errors, but the second succeeds. In old times, because of failure in
-- clean-up, both rows were visible. But only the 2nd one should be visible.
--
insert into test_tr values(5);
ERROR: error
CONTEXT: PL/pgSQL function trs_after_erroring() line 4 at RAISE
insert into test_tr values(6);
SELECT * FROM test_tr ORDER BY i;
i
---------------------------------------------------------------------
6
(1 row)
drop table test_tr;
--
-- https://github.com/citusdata/cstore2/issues/32
--
create table events(
user_id bigint,
event_id bigint,
event_time timestamp default now(),
value float default random())
PARTITION BY RANGE (event_time);
create table events_p2020_11_04_102965
PARTITION OF events FOR VALUES FROM ('2020-11-04 00:00:00+01') TO ('2020-11-05 00:00:00+01')
USING cstore_tableam;
create table events_trigger_target(
user_id bigint,
avg float,
__count__ bigint
) USING cstore_tableam;
CREATE OR REPLACE FUNCTION user_value_by_day()
RETURNS trigger
LANGUAGE plpgsql
AS $function$
BEGIN
IF (TG_OP = 'INSERT' OR TG_OP = 'UPDATE') THEN
EXECUTE format($exec_format$INSERT INTO %s AS __mat__ SELECT user_id, 0.1 AS avg, pg_catalog.count(*) AS __count__ FROM __ins__ events GROUP BY user_id;
$exec_format$, TG_ARGV[0]);
END IF;
IF (TG_OP = 'DELETE' OR TG_OP = 'UPDATE') THEN
RAISE EXCEPTION $ex$MATERIALIZED VIEW 'user_value_by_day' on table 'events' does not support UPDATE/DELETE$ex$;
END IF;
IF (TG_OP = 'TRUNCATE') THEN
EXECUTE format($exec_format$TRUNCATE TABLE %s; $exec_format$, TG_ARGV[0]);
END IF;
RETURN NULL;
END;
$function$;
create trigger "user_value_by_day_INSERT" AFTER INSERT ON events
REFERENCING NEW TABLE AS __ins__
FOR EACH STATEMENT EXECUTE FUNCTION user_value_by_day('events_trigger_target');
COPY events FROM STDIN WITH (FORMAT 'csv');
SELECT * FROM events ORDER BY user_id;
user_id | event_id | event_time | value
---------------------------------------------------------------------
1 | 1 | Wed Nov 04 15:54:02.226999 2020 | 1.1
2 | 3 | Wed Nov 04 16:54:02.226999 2020 | 2.2
(2 rows)
SELECT * FROM events_trigger_target ORDER BY user_id;
user_id | avg | __count__
---------------------------------------------------------------------
1 | 0.1 | 1
2 | 0.1 | 1
(2 rows)
DROP TABLE events;

View File

@ -11,7 +11,7 @@ step s1-insert:
INSERT INTO test_vacuum_vs_insert SELECT i, 2 * i FROM generate_series(1, 3) i;
s2: INFO: statistics for "test_vacuum_vs_insert":
total file size: 24576, total data size: 26
total file size: 16384, total data size: 26
total row count: 3, stripe count: 1, average rows per stripe: 3
block count: 2, containing data for dropped columns: 0, none compressed: 2, pglz compressed: 0

View File

@ -0,0 +1,143 @@
CREATE TABLE t1(a int, b int) USING cstore_tableam;
CREATE TABLE t2(a int, b int) USING cstore_tableam;
CREATE FUNCTION f(x INT) RETURNS INT AS $$
INSERT INTO t1 VALUES(x, x * 2) RETURNING b - 1;
$$ LANGUAGE SQL;
--
-- Following query will start a write to t1 before finishing
-- write to t1, so it tests that we handle recursive writes
-- correctly.
--
INSERT INTO t2 SELECT i, f(i) FROM generate_series(1, 5) i;
-- there are no subtransactions, so above statement should batch
-- INSERTs inside the UDF and create on stripe per table.
SELECT relname, count(*) FROM cstore.cstore_stripes a, pg_class b
WHERE a.relfilenode=b.relfilenode AND relname IN ('t1', 't2')
GROUP BY relname
ORDER BY relname;
SELECT * FROM t1 ORDER BY a;
SELECT * FROM t2 ORDER BY a;
TRUNCATE t1;
TRUNCATE t2;
DROP FUNCTION f(INT);
--
-- Test the case when 2 writes are going on concurrently in the
-- same executor, and those 2 writes are dependent.
--
WITH t AS (
INSERT INTO t1 SELECT i, 2*i FROM generate_series(1, 5) i RETURNING *
)
INSERT INTO t2 SELECT t.a, t.a+1 FROM t;
SELECT * FROM t1;
SELECT * FROM t2;
TRUNCATE t1;
TRUNCATE t2;
--
-- Test the case when there are 2 independent inserts in a CTE.
-- Also tests the case where some of the tuple_inserts happen in
-- ExecutorFinish() instead of ExecutorRun().
--
WITH t AS (
INSERT INTO t1 SELECT i, 2*i FROM generate_series(1, 5) i RETURNING *
)
INSERT INTO t2 SELECT i, (select count(*) from t1) FROM generate_series(1, 3) i;
SELECT * FROM t1;
SELECT * FROM t2;
TRUNCATE t1;
TRUNCATE t2;
--
-- double insert on the same relation
--
WITH t AS (
INSERT INTO t1 SELECT i, 2*i FROM generate_series(1, 5) i RETURNING *
)
INSERT INTO t1 SELECT t.a, t.a+1 FROM t;
SELECT * FROM t1 ORDER BY a, b;
TRUNCATE t1;
TRUNCATE t2;
--
-- A test where result of a UDF call will depend on execution
-- of previous UDF calls.
--
CREATE FUNCTION g(x INT) RETURNS INT AS $$
INSERT INTO t1 VALUES(x, x * 2);
SELECT count(*)::int FROM t1;
$$ LANGUAGE SQL;
-- t3 and t4 are heap tables to help with cross-checking results
CREATE TABLE t3(a int, b int);
CREATE TABLE t4(a int, b int);
CREATE FUNCTION g2(x INT) RETURNS INT AS $$
INSERT INTO t3 VALUES(x, x * 2);
SELECT count(*)::int FROM t3;
$$ LANGUAGE SQL;
INSERT INTO t2 SELECT i, g(i) FROM generate_series(1, 5) i;
INSERT INTO t4 SELECT i, g2(i) FROM generate_series(1, 5) i;
-- check that t1==t3 and t2==t4.
((table t1) except (table t3)) union ((table t3) except (table t1));
((table t2) except (table t4)) union ((table t4) except (table t2));
SELECT * FROM t2 ORDER BY a, b;
TRUNCATE t1, t2, t3, t4;
--
-- INSERT into the same relation that was INSERTed into in the UDF
--
INSERT INTO t1 SELECT i, g(i) FROM generate_series(1, 3) i;
INSERT INTO t3 SELECT i, g2(i) FROM generate_series(1, 3) i;
SELECT * FROM t1 ORDER BY a, b;
SELECT * FROM t3 ORDER BY a, b;
-- check that t1==t3 and t2==t4.
((table t1) except (table t3)) union ((table t3) except (table t1));
((table t2) except (table t4)) union ((table t4) except (table t2));
DROP FUNCTION g(int), g2(int);
TRUNCATE t1, t2, t3, t4;
--
-- EXCEPTION in plpgsql, which is implemented internally using
-- subtransactions. plgpsql uses SPI to execute INSERT statements.
--
CREATE FUNCTION f(a int) RETURNS VOID AS $$
DECLARE
x int;
BEGIN
INSERT INTO t1 SELECT i, i + 1 FROM generate_series(a, a + 1) i;
x := 10 / a;
INSERT INTO t1 SELECT i, i * 2 FROM generate_series(a + 2, a + 3) i;
EXCEPTION WHEN division_by_zero THEN
INSERT INTO t1 SELECT i, i + 1 FROM generate_series(a + 2, a + 3) i;
END;
$$ LANGUAGE plpgsql;
SELECT f(10);
SELECT f(0), f(20);
SELECT * FROM t1 ORDER BY a, b;
DROP FUNCTION f(int);
DROP TABLE t1, t2, t3, t4;

View File

@ -23,6 +23,7 @@ WHERE a.relfilenode = b.relfilenode AND b.relname = 't';
BEGIN;
SAVEPOINT s0;
INSERT INTO t SELECT i, i+1 FROM generate_series(1, 10) i;
SELECT count(*) FROM t; -- force flush
SAVEPOINT s1;
INSERT INTO t SELECT i, i+1 FROM generate_series(1, 10) i;
SELECT count(*) FROM t;

View File

@ -0,0 +1,70 @@
--
-- Testing we handle transactions properly
--
CREATE TABLE t(a int, b int) USING cstore_tableam;
INSERT INTO t SELECT i, 2 * i FROM generate_series(1, 3) i;
SELECT * FROM t ORDER BY a;
-- verify that table rewrites work properly
BEGIN;
ALTER TABLE t ALTER COLUMN b TYPE float4 USING (b + 0.5)::float4;
INSERT INTO t VALUES (4, 8.5);
SELECT * FROM t ORDER BY a;
ROLLBACK;
SELECT * FROM t ORDER BY a;
-- verify truncate rollback
BEGIN;
TRUNCATE t;
INSERT INTO t VALUES (4, 8);
SELECT * FROM t ORDER BY a;
SAVEPOINT s1;
TRUNCATE t;
SELECT * FROM t ORDER BY a;
ROLLBACK TO SAVEPOINT s1;
SELECT * FROM t ORDER BY a;
ROLLBACK;
-- verify truncate with unflushed data in upper xacts
BEGIN;
INSERT INTO t VALUES (4, 8);
SAVEPOINT s1;
TRUNCATE t;
ROLLBACK TO SAVEPOINT s1;
COMMIT;
SELECT * FROM t ORDER BY a;
-- verify DROP TABLE rollback
BEGIN;
INSERT INTO t VALUES (5, 10);
SELECT * FROM t ORDER BY a;
SAVEPOINT s1;
DROP TABLE t;
SELECT * FROM t ORDER BY a;
ROLLBACK TO SAVEPOINT s1;
SELECT * FROM t ORDER BY a;
ROLLBACK;
-- verify DROP TABLE with unflushed data in upper xacts
BEGIN;
INSERT INTO t VALUES (5, 10);
SAVEPOINT s1;
DROP TABLE t;
SELECT * FROM t ORDER BY a;
ROLLBACK TO SAVEPOINT s1;
COMMIT;
SELECT * FROM t ORDER BY a;
-- verify SELECT when unflushed data in upper transactions errors.
BEGIN;
INSERT INTO t VALUES (6, 12);
SAVEPOINT s1;
SELECT * FROM t;
ROLLBACK;
SELECT * FROM t ORDER BY a;
DROP TABLE t;

View File

@ -58,4 +58,87 @@ create trigger tr_after_row after insert on test_tr
insert into test_tr values(1);
insert into test_tr values(2),(3),(4);
SELECT * FROM test_tr ORDER BY i;
drop table test_tr;
create table test_tr(i int) using cstore_tableam;
-- we should be able to clean-up and continue gracefully if we
-- error out in AFTER STATEMENT triggers.
CREATE SEQUENCE counter START 100;
create or replace function trs_after_erroring() returns trigger language plpgsql as $$
BEGIN
IF nextval('counter') % 2 = 0 THEN
RAISE EXCEPTION '%', 'error';
END IF;
RETURN NULL;
END;
$$;
create trigger tr_after_stmt_erroring after insert on test_tr
referencing new table as new_table
for each statement execute procedure trs_after_erroring();
--
-- Once upon a time we didn't clean-up properly after erroring out. Here the first
-- statement errors, but the second succeeds. In old times, because of failure in
-- clean-up, both rows were visible. But only the 2nd one should be visible.
--
insert into test_tr values(5);
insert into test_tr values(6);
SELECT * FROM test_tr ORDER BY i;
drop table test_tr;
--
-- https://github.com/citusdata/cstore2/issues/32
--
create table events(
user_id bigint,
event_id bigint,
event_time timestamp default now(),
value float default random())
PARTITION BY RANGE (event_time);
create table events_p2020_11_04_102965
PARTITION OF events FOR VALUES FROM ('2020-11-04 00:00:00+01') TO ('2020-11-05 00:00:00+01')
USING cstore_tableam;
create table events_trigger_target(
user_id bigint,
avg float,
__count__ bigint
) USING cstore_tableam;
CREATE OR REPLACE FUNCTION user_value_by_day()
RETURNS trigger
LANGUAGE plpgsql
AS $function$
BEGIN
IF (TG_OP = 'INSERT' OR TG_OP = 'UPDATE') THEN
EXECUTE format($exec_format$INSERT INTO %s AS __mat__ SELECT user_id, 0.1 AS avg, pg_catalog.count(*) AS __count__ FROM __ins__ events GROUP BY user_id;
$exec_format$, TG_ARGV[0]);
END IF;
IF (TG_OP = 'DELETE' OR TG_OP = 'UPDATE') THEN
RAISE EXCEPTION $ex$MATERIALIZED VIEW 'user_value_by_day' on table 'events' does not support UPDATE/DELETE$ex$;
END IF;
IF (TG_OP = 'TRUNCATE') THEN
EXECUTE format($exec_format$TRUNCATE TABLE %s; $exec_format$, TG_ARGV[0]);
END IF;
RETURN NULL;
END;
$function$;
create trigger "user_value_by_day_INSERT" AFTER INSERT ON events
REFERENCING NEW TABLE AS __ins__
FOR EACH STATEMENT EXECUTE FUNCTION user_value_by_day('events_trigger_target');
COPY events FROM STDIN WITH (FORMAT 'csv');
1,1,"2020-11-04 15:54:02.226999-08",1.1
2,3,"2020-11-04 16:54:02.226999-08",2.2
\.
SELECT * FROM events ORDER BY user_id;
SELECT * FROM events_trigger_target ORDER BY user_id;
DROP TABLE events;