WIP executor boundary

pull/5217/head
Jeff Davis 2021-08-24 17:15:10 -07:00
parent d50830d4cc
commit d3c7d30a1e
3 changed files with 122 additions and 439 deletions

View File

@ -67,6 +67,18 @@
#define VACUUM_TRUNCATE_LOCK_WAIT_INTERVAL 50 /* ms */ #define VACUUM_TRUNCATE_LOCK_WAIT_INTERVAL 50 /* ms */
#define VACUUM_TRUNCATE_LOCK_TIMEOUT 4500 /* ms */ #define VACUUM_TRUNCATE_LOCK_TIMEOUT 4500 /* ms */
typedef struct ColumnarExecLevel
{
HTAB *hashtable;
struct ColumnarExecLevel *next; /* next higher level */
} ColumnarExecLevel;
typedef struct ColumnarWriterEntry
{
RelFileNode relfilenode;
ColumnarWriteState *writeState;
} ColumnarWriterEntry;
/* /*
* ColumnarScanDescData is the scan state passed between beginscan(), * ColumnarScanDescData is the scan state passed between beginscan(),
* getnextslot(), rescan(), and endscan() calls. * getnextslot(), rescan(), and endscan() calls.
@ -108,6 +120,11 @@ typedef struct IndexFetchColumnarData
static object_access_hook_type PrevObjectAccessHook = NULL; static object_access_hook_type PrevObjectAccessHook = NULL;
static ProcessUtility_hook_type PrevProcessUtilityHook = NULL; static ProcessUtility_hook_type PrevProcessUtilityHook = NULL;
static ExecutorStart_hook_type PrevExecutorStartHook = NULL;
static ExecutorFinish_hook_type PrevExecutorFinishHook = NULL;
static ColumnarExecLevel *ColumnarExecLevelStack = NULL;
static MemoryContext ColumnarWriterContext = NULL;
/* forward declaration for static functions */ /* forward declaration for static functions */
static MemoryContext CreateColumnarScanMemoryContext(void); static MemoryContext CreateColumnarScanMemoryContext(void);
@ -191,8 +208,6 @@ columnar_beginscan_extended(Relation relation, Snapshot snapshot,
ParallelTableScanDesc parallel_scan, ParallelTableScanDesc parallel_scan,
uint32 flags, Bitmapset *attr_needed, List *scanQual) uint32 flags, Bitmapset *attr_needed, List *scanQual)
{ {
Oid relfilenode = relation->rd_node.relNode;
/* /*
* A memory context to use for scan-wide data, including the lazily * A memory context to use for scan-wide data, including the lazily
* initialized read state. We assume that beginscan is called in a * initialized read state. We assume that beginscan is called in a
@ -221,14 +236,6 @@ columnar_beginscan_extended(Relation relation, Snapshot snapshot,
scan->scanQual = copyObject(scanQual); scan->scanQual = copyObject(scanQual);
scan->scanContext = scanContext; scan->scanContext = scanContext;
if (PendingWritesInUpperTransactions(relfilenode, GetCurrentSubTransactionId()))
{
elog(ERROR,
"cannot read from table when there is unflushed data in upper transactions");
}
FlushWriteStateForRelfilenode(relfilenode, GetCurrentSubTransactionId());
MemoryContextSwitchTo(oldContext); MemoryContextSwitchTo(oldContext);
return ((TableScanDesc) scan); return ((TableScanDesc) scan);
@ -421,16 +428,6 @@ columnar_parallelscan_reinitialize(Relation rel, ParallelTableScanDesc pscan)
static IndexFetchTableData * static IndexFetchTableData *
columnar_index_fetch_begin(Relation rel) columnar_index_fetch_begin(Relation rel)
{ {
Oid relfilenode = rel->rd_node.relNode;
if (PendingWritesInUpperTransactions(relfilenode, GetCurrentSubTransactionId()))
{
/* XXX: maybe we can just flush the data and continue */
elog(ERROR, "cannot read from index when there is unflushed data in "
"upper transactions");
}
FlushWriteStateForRelfilenode(relfilenode, GetCurrentSubTransactionId());
MemoryContext scanContext = CreateColumnarScanMemoryContext(); MemoryContext scanContext = CreateColumnarScanMemoryContext();
MemoryContext oldContext = MemoryContextSwitchTo(scanContext); MemoryContext oldContext = MemoryContextSwitchTo(scanContext);
@ -563,6 +560,56 @@ columnar_compute_xid_horizon_for_tuples(Relation rel,
} }
MemoryContext
GetWriteContextForDebug(void)
{
return ColumnarWriterContext;
}
static ColumnarWriteState *
GetWriteState(Relation relation, TupleDesc tupdesc, bool iscopy)
{
if (ColumnarWriterContext == NULL)
{
ColumnarWriterContext = AllocSetContextCreate(TopMemoryContext,
"Columnar Writers Memory Context",
ALLOCSET_DEFAULT_SIZES);
}
ColumnarExecLevel *level = ColumnarExecLevelStack;
if (level->hashtable == NULL)
{
HASHCTL info;
uint32 hashFlags = (HASH_ELEM | HASH_CONTEXT);
memset(&info, 0, sizeof(info));
info.keysize = sizeof(RelFileNode);
info.entrysize = sizeof(ColumnarWriterEntry);
info.hcxt = ColumnarWriterContext;
level->hashtable = hash_create("columnar writers",
64, &info, hashFlags);
}
bool found;
ColumnarWriterEntry *entry = hash_search(
level->hashtable, &relation->rd_node, HASH_ENTER, &found);
if (!found)
{
ColumnarOptions columnarOptions = { 0 };
ReadColumnarOptions(relation->rd_id, &columnarOptions);
entry->writeState = ColumnarBeginWrite(relation->rd_node,
columnarOptions,
tupdesc);
}
return entry->writeState;
}
static void static void
columnar_tuple_insert(Relation relation, TupleTableSlot *slot, CommandId cid, columnar_tuple_insert(Relation relation, TupleTableSlot *slot, CommandId cid,
int options, BulkInsertState bistate) int options, BulkInsertState bistate)
@ -571,9 +618,8 @@ columnar_tuple_insert(Relation relation, TupleTableSlot *slot, CommandId cid,
* columnar_init_write_state allocates the write state in a longer * columnar_init_write_state allocates the write state in a longer
* lasting context, so no need to worry about it. * lasting context, so no need to worry about it.
*/ */
ColumnarWriteState *writeState = columnar_init_write_state(relation, ColumnarWriteState *writeState = GetWriteState(relation, RelationGetDescr(relation), false);
RelationGetDescr(relation),
GetCurrentSubTransactionId());
MemoryContext oldContext = MemoryContextSwitchTo(ColumnarWritePerTupleContext( MemoryContext oldContext = MemoryContextSwitchTo(ColumnarWritePerTupleContext(
writeState)); writeState));
@ -613,9 +659,8 @@ static void
columnar_multi_insert(Relation relation, TupleTableSlot **slots, int ntuples, columnar_multi_insert(Relation relation, TupleTableSlot **slots, int ntuples,
CommandId cid, int options, BulkInsertState bistate) CommandId cid, int options, BulkInsertState bistate)
{ {
ColumnarWriteState *writeState = columnar_init_write_state(relation, ColumnarWriteState *writeState = GetWriteState(relation,
RelationGetDescr(relation), RelationGetDescr(relation), true);
GetCurrentSubTransactionId());
ColumnarCheckLogicalReplication(relation); ColumnarCheckLogicalReplication(relation);
@ -701,8 +746,6 @@ columnar_relation_set_new_filenode(Relation rel,
*/ */
if (rel->rd_node.relNode != newrnode->relNode) if (rel->rd_node.relNode != newrnode->relNode)
{ {
MarkRelfilenodeDropped(rel->rd_node.relNode, GetCurrentSubTransactionId());
DeleteMetadataRows(rel->rd_node); DeleteMetadataRows(rel->rd_node);
} }
@ -724,8 +767,6 @@ columnar_relation_nontransactional_truncate(Relation rel)
{ {
RelFileNode relfilenode = rel->rd_node; RelFileNode relfilenode = rel->rd_node;
NonTransactionDropWriteState(relfilenode.relNode);
/* Delete old relfilenode metadata */ /* Delete old relfilenode metadata */
DeleteMetadataRows(relfilenode); DeleteMetadataRows(relfilenode);
@ -1687,7 +1728,6 @@ ColumnarXactCallback(XactEvent event, void *arg)
case XACT_EVENT_ABORT: case XACT_EVENT_ABORT:
case XACT_EVENT_PARALLEL_ABORT: case XACT_EVENT_PARALLEL_ABORT:
{ {
DiscardWriteStateForAllRels(GetCurrentSubTransactionId(), 0);
break; break;
} }
@ -1695,7 +1735,6 @@ ColumnarXactCallback(XactEvent event, void *arg)
case XACT_EVENT_PARALLEL_PRE_COMMIT: case XACT_EVENT_PARALLEL_PRE_COMMIT:
case XACT_EVENT_PRE_PREPARE: case XACT_EVENT_PRE_PREPARE:
{ {
FlushWriteStateForAllRels(GetCurrentSubTransactionId(), 0);
break; break;
} }
} }
@ -1717,19 +1756,60 @@ ColumnarSubXactCallback(SubXactEvent event, SubTransactionId mySubid,
case SUBXACT_EVENT_ABORT_SUB: case SUBXACT_EVENT_ABORT_SUB:
{ {
DiscardWriteStateForAllRels(mySubid, parentSubid);
break; break;
} }
case SUBXACT_EVENT_PRE_COMMIT_SUB: case SUBXACT_EVENT_PRE_COMMIT_SUB:
{ {
FlushWriteStateForAllRels(mySubid, parentSubid);
break; break;
} }
} }
} }
static void
ColumnarExecutorStart(QueryDesc *queryDesc, int eflags)
{
PrevExecutorStartHook(queryDesc, eflags);
ColumnarExecLevel *level = palloc0(sizeof(ColumnarExecLevel));
/* initialize hashtable lazily */
level->next = ColumnarExecLevelStack;
ColumnarExecLevelStack = level;
}
static void
ColumnarFlushWriters(HTAB *hashtable)
{
HASH_SEQ_STATUS status;
ColumnarWriterEntry *entry;
hash_seq_init(&status, hashtable);
while ((entry = hash_seq_search(&status)) != 0)
{
ColumnarEndWrite(entry->writeState);
}
hash_destroy(hashtable);
}
static void
ColumnarExecutorFinish(QueryDesc *queryDesc)
{
ColumnarExecLevel *level = ColumnarExecLevelStack;
Assert(level != NULL);
if (level->hashtable != NULL)
{
ColumnarFlushWriters(level->hashtable);
}
ColumnarExecLevelStack = level->next;
PrevExecutorFinishHook(queryDesc);
}
void void
columnar_tableam_init() columnar_tableam_init()
{ {
@ -1744,6 +1824,14 @@ columnar_tableam_init()
standard_ProcessUtility; standard_ProcessUtility;
ProcessUtility_hook = ColumnarProcessUtility; ProcessUtility_hook = ColumnarProcessUtility;
PrevExecutorStartHook = ExecutorStart_hook ?
ExecutorStart_hook : standard_ExecutorStart;
ExecutorStart_hook = ColumnarExecutorStart;
PrevExecutorFinishHook = ExecutorFinish_hook ?
ExecutorFinish_hook : standard_ExecutorFinish;
ExecutorFinish_hook = ColumnarExecutorFinish;
columnar_customscan_init(); columnar_customscan_init();
TTSOpsColumnar = TTSOpsVirtual; TTSOpsColumnar = TTSOpsVirtual;
@ -1824,8 +1912,6 @@ ColumnarTableDropHook(Oid relid)
DeleteMetadataRows(relfilenode); DeleteMetadataRows(relfilenode);
DeleteColumnarTableOptions(rel->rd_id, true); DeleteColumnarTableOptions(rel->rd_id, true);
MarkRelfilenodeDropped(relfilenode.relNode, GetCurrentSubTransactionId());
/* keep the lock since we did physical changes to the relation */ /* keep the lock since we did physical changes to the relation */
table_close(rel, NoLock); table_close(rel, NoLock);
} }

View File

@ -1,388 +0,0 @@
#include "citus_version.h"
#include "postgres.h"
#include "columnar/columnar.h"
#include <math.h>
#include "miscadmin.h"
#include "access/genam.h"
#include "access/heapam.h"
#include "access/multixact.h"
#include "access/rewriteheap.h"
#include "access/tsmapi.h"
#if PG_VERSION_NUM >= 130000
#include "access/heaptoast.h"
#else
#include "access/tuptoaster.h"
#endif
#include "access/xact.h"
#include "catalog/catalog.h"
#include "catalog/index.h"
#include "catalog/objectaccess.h"
#include "catalog/pg_am.h"
#include "catalog/pg_trigger.h"
#include "catalog/storage.h"
#include "catalog/storage_xlog.h"
#include "commands/progress.h"
#include "commands/vacuum.h"
#include "executor/executor.h"
#include "nodes/makefuncs.h"
#include "optimizer/plancat.h"
#include "pgstat.h"
#include "storage/bufmgr.h"
#include "storage/bufpage.h"
#include "storage/bufmgr.h"
#include "storage/lmgr.h"
#include "storage/predicate.h"
#include "storage/procarray.h"
#include "storage/smgr.h"
#include "tcop/utility.h"
#include "utils/builtins.h"
#include "utils/pg_rusage.h"
#include "utils/rel.h"
#include "utils/syscache.h"
#include "columnar/columnar_customscan.h"
#include "columnar/columnar_tableam.h"
#include "columnar/columnar_version_compat.h"
/*
* Mapping from relfilenode to WriteStateMapEntry. This keeps write state for
* each relation.
*/
static HTAB *WriteStateMap = NULL;
/* memory context for allocating WriteStateMap & all write states */
static MemoryContext WriteStateContext = NULL;
/*
* Each member of the writeStateStack in WriteStateMapEntry. This means that
* we did some inserts in the subtransaction subXid, and the state of those
* inserts is stored at writeState. Those writes can be flushed or unflushed.
*/
typedef struct SubXidWriteState
{
SubTransactionId subXid;
ColumnarWriteState *writeState;
struct SubXidWriteState *next;
} SubXidWriteState;
/*
* An entry in WriteStateMap.
*/
typedef struct WriteStateMapEntry
{
/* key of the entry */
Oid relfilenode;
/*
* If a table is dropped, we set dropped to true and set dropSubXid to the
* id of the subtransaction in which the drop happened.
*/
bool dropped;
SubTransactionId dropSubXid;
/*
* Stack of SubXidWriteState where first element is top of the stack. When
* inserts happen, we look at top of the stack. If top of stack belongs to
* current subtransaction, we forward writes to its writeState. Otherwise,
* we create a new stack entry for current subtransaction and push it to
* the stack, and forward writes to that.
*/
SubXidWriteState *writeStateStack;
} WriteStateMapEntry;
/*
* Memory context reset callback so we reset WriteStateMap to NULL at the end
* of transaction. WriteStateMap is allocated in & WriteStateMap, so its
* leaked reference can cause memory issues.
*/
static MemoryContextCallback cleanupCallback;
static void
CleanupWriteStateMap(void *arg)
{
WriteStateMap = NULL;
WriteStateContext = NULL;
}
ColumnarWriteState *
columnar_init_write_state(Relation relation, TupleDesc tupdesc,
SubTransactionId currentSubXid)
{
bool found;
/*
* If this is the first call in current transaction, allocate the hash
* table.
*/
if (WriteStateMap == NULL)
{
WriteStateContext =
AllocSetContextCreate(
TopTransactionContext,
"Column Store Write State Management Context",
ALLOCSET_DEFAULT_SIZES);
HASHCTL info;
uint32 hashFlags = (HASH_ELEM | HASH_CONTEXT);
memset(&info, 0, sizeof(info));
info.keysize = sizeof(Oid);
info.entrysize = sizeof(WriteStateMapEntry);
info.hcxt = WriteStateContext;
WriteStateMap = hash_create("column store write state map",
64, &info, hashFlags);
cleanupCallback.arg = NULL;
cleanupCallback.func = &CleanupWriteStateMap;
cleanupCallback.next = NULL;
MemoryContextRegisterResetCallback(WriteStateContext, &cleanupCallback);
}
WriteStateMapEntry *hashEntry = hash_search(WriteStateMap, &relation->rd_node.relNode,
HASH_ENTER, &found);
if (!found)
{
hashEntry->writeStateStack = NULL;
hashEntry->dropped = false;
}
Assert(!hashEntry->dropped);
/*
* If top of stack belongs to the current subtransaction, return its
* writeState, ...
*/
if (hashEntry->writeStateStack != NULL)
{
SubXidWriteState *stackHead = hashEntry->writeStateStack;
if (stackHead->subXid == currentSubXid)
{
return stackHead->writeState;
}
}
/*
* ... otherwise we need to create a new stack entry for the current
* subtransaction.
*/
MemoryContext oldContext = MemoryContextSwitchTo(WriteStateContext);
ColumnarOptions columnarOptions = { 0 };
ReadColumnarOptions(relation->rd_id, &columnarOptions);
SubXidWriteState *stackEntry = palloc0(sizeof(SubXidWriteState));
stackEntry->writeState = ColumnarBeginWrite(relation->rd_node,
columnarOptions,
tupdesc);
stackEntry->subXid = currentSubXid;
stackEntry->next = hashEntry->writeStateStack;
hashEntry->writeStateStack = stackEntry;
MemoryContextSwitchTo(oldContext);
return stackEntry->writeState;
}
/*
* Flushes pending writes for given relfilenode in the given subtransaction.
*/
void
FlushWriteStateForRelfilenode(Oid relfilenode, SubTransactionId currentSubXid)
{
if (WriteStateMap == NULL)
{
return;
}
WriteStateMapEntry *entry = hash_search(WriteStateMap, &relfilenode, HASH_FIND, NULL);
Assert(!entry || !entry->dropped);
if (entry && entry->writeStateStack != NULL)
{
SubXidWriteState *stackEntry = entry->writeStateStack;
if (stackEntry->subXid == currentSubXid)
{
ColumnarFlushPendingWrites(stackEntry->writeState);
}
}
}
/*
* Helper function for FlushWriteStateForAllRels and DiscardWriteStateForAllRels.
* Pops all of write states for current subtransaction, and depending on "commit"
* either flushes them or discards them. This also takes into account dropped
* tables, and either propagates the dropped flag to parent subtransaction or
* rolls back abort.
*/
static void
PopWriteStateForAllRels(SubTransactionId currentSubXid, SubTransactionId parentSubXid,
bool commit)
{
HASH_SEQ_STATUS status;
WriteStateMapEntry *entry;
if (WriteStateMap == NULL)
{
return;
}
hash_seq_init(&status, WriteStateMap);
while ((entry = hash_seq_search(&status)) != 0)
{
if (entry->writeStateStack == NULL)
{
continue;
}
/*
* If the table has been dropped in current subtransaction, either
* commit the drop or roll it back.
*/
if (entry->dropped)
{
if (entry->dropSubXid == currentSubXid)
{
if (commit)
{
/* elevate drop to the upper subtransaction */
entry->dropSubXid = parentSubXid;
}
else
{
/* abort the drop */
entry->dropped = false;
}
}
}
/*
* Otherwise, commit or discard pending writes.
*/
else
{
SubXidWriteState *stackHead = entry->writeStateStack;
if (stackHead->subXid == currentSubXid)
{
if (commit)
{
ColumnarEndWrite(stackHead->writeState);
}
entry->writeStateStack = stackHead->next;
}
}
}
}
/*
* Called when current subtransaction is committed.
*/
void
FlushWriteStateForAllRels(SubTransactionId currentSubXid, SubTransactionId parentSubXid)
{
PopWriteStateForAllRels(currentSubXid, parentSubXid, true);
}
/*
* Called when current subtransaction is aborted.
*/
void
DiscardWriteStateForAllRels(SubTransactionId currentSubXid, SubTransactionId parentSubXid)
{
PopWriteStateForAllRels(currentSubXid, parentSubXid, false);
}
/*
* Called when the given relfilenode is dropped.
*/
void
MarkRelfilenodeDropped(Oid relfilenode, SubTransactionId currentSubXid)
{
if (WriteStateMap == NULL)
{
return;
}
WriteStateMapEntry *entry = hash_search(WriteStateMap, &relfilenode, HASH_FIND,
NULL);
if (!entry || entry->dropped)
{
return;
}
entry->dropped = true;
entry->dropSubXid = currentSubXid;
}
/*
* Called when the given relfilenode is dropped in non-transactional TRUNCATE.
*/
void
NonTransactionDropWriteState(Oid relfilenode)
{
if (WriteStateMap)
{
hash_search(WriteStateMap, &relfilenode, HASH_REMOVE, false);
}
}
/*
* Returns true if there are any pending writes in upper transactions.
*/
bool
PendingWritesInUpperTransactions(Oid relfilenode, SubTransactionId currentSubXid)
{
if (WriteStateMap == NULL)
{
return false;
}
WriteStateMapEntry *entry = hash_search(WriteStateMap, &relfilenode, HASH_FIND, NULL);
if (entry && entry->writeStateStack != NULL)
{
SubXidWriteState *stackEntry = entry->writeStateStack;
while (stackEntry != NULL)
{
if (stackEntry->subXid != currentSubXid &&
ContainsPendingWrites(stackEntry->writeState))
{
return true;
}
stackEntry = stackEntry->next;
}
}
return false;
}
/*
* GetWriteContextForDebug exposes WriteStateContext for debugging
* purposes.
*/
extern MemoryContext
GetWriteContextForDebug(void)
{
return WriteStateContext;
}

View File

@ -265,21 +265,6 @@ extern StripeMetadata * FindStripeWithHighestRowNumber(Relation relation,
Snapshot snapshot); Snapshot snapshot);
extern Datum columnar_relation_storageid(PG_FUNCTION_ARGS); extern Datum columnar_relation_storageid(PG_FUNCTION_ARGS);
/* write_state_management.c */
extern ColumnarWriteState * columnar_init_write_state(Relation relation, TupleDesc
tupdesc,
SubTransactionId currentSubXid);
extern void FlushWriteStateForRelfilenode(Oid relfilenode, SubTransactionId
currentSubXid);
extern void FlushWriteStateForAllRels(SubTransactionId currentSubXid, SubTransactionId
parentSubXid);
extern void DiscardWriteStateForAllRels(SubTransactionId currentSubXid, SubTransactionId
parentSubXid);
extern void MarkRelfilenodeDropped(Oid relfilenode, SubTransactionId currentSubXid);
extern void NonTransactionDropWriteState(Oid relfilenode);
extern bool PendingWritesInUpperTransactions(Oid relfilenode,
SubTransactionId currentSubXid);
extern MemoryContext GetWriteContextForDebug(void); extern MemoryContext GetWriteContextForDebug(void);