From d3c7d30a1ee01b7ea8db6ba58cca7573046d7ed1 Mon Sep 17 00:00:00 2001 From: Jeff Davis Date: Tue, 24 Aug 2021 17:15:10 -0700 Subject: [PATCH] WIP executor boundary --- src/backend/columnar/columnar_tableam.c | 158 +++++-- src/backend/columnar/write_state_management.c | 388 ------------------ src/include/columnar/columnar.h | 15 - 3 files changed, 122 insertions(+), 439 deletions(-) delete mode 100644 src/backend/columnar/write_state_management.c diff --git a/src/backend/columnar/columnar_tableam.c b/src/backend/columnar/columnar_tableam.c index 6630d0430..9c102f39e 100644 --- a/src/backend/columnar/columnar_tableam.c +++ b/src/backend/columnar/columnar_tableam.c @@ -67,6 +67,18 @@ #define VACUUM_TRUNCATE_LOCK_WAIT_INTERVAL 50 /* ms */ #define VACUUM_TRUNCATE_LOCK_TIMEOUT 4500 /* ms */ +typedef struct ColumnarExecLevel +{ + HTAB *hashtable; + struct ColumnarExecLevel *next; /* next higher level */ +} ColumnarExecLevel; + +typedef struct ColumnarWriterEntry +{ + RelFileNode relfilenode; + ColumnarWriteState *writeState; +} ColumnarWriterEntry; + /* * ColumnarScanDescData is the scan state passed between beginscan(), * getnextslot(), rescan(), and endscan() calls. @@ -108,6 +120,11 @@ typedef struct IndexFetchColumnarData static object_access_hook_type PrevObjectAccessHook = NULL; static ProcessUtility_hook_type PrevProcessUtilityHook = NULL; +static ExecutorStart_hook_type PrevExecutorStartHook = NULL; +static ExecutorFinish_hook_type PrevExecutorFinishHook = NULL; + +static ColumnarExecLevel *ColumnarExecLevelStack = NULL; +static MemoryContext ColumnarWriterContext = NULL; /* forward declaration for static functions */ static MemoryContext CreateColumnarScanMemoryContext(void); @@ -191,8 +208,6 @@ columnar_beginscan_extended(Relation relation, Snapshot snapshot, ParallelTableScanDesc parallel_scan, uint32 flags, Bitmapset *attr_needed, List *scanQual) { - Oid relfilenode = relation->rd_node.relNode; - /* * A memory context to use for scan-wide data, including the lazily * initialized read state. We assume that beginscan is called in a @@ -221,14 +236,6 @@ columnar_beginscan_extended(Relation relation, Snapshot snapshot, scan->scanQual = copyObject(scanQual); scan->scanContext = scanContext; - if (PendingWritesInUpperTransactions(relfilenode, GetCurrentSubTransactionId())) - { - elog(ERROR, - "cannot read from table when there is unflushed data in upper transactions"); - } - - FlushWriteStateForRelfilenode(relfilenode, GetCurrentSubTransactionId()); - MemoryContextSwitchTo(oldContext); return ((TableScanDesc) scan); @@ -421,16 +428,6 @@ columnar_parallelscan_reinitialize(Relation rel, ParallelTableScanDesc pscan) static IndexFetchTableData * columnar_index_fetch_begin(Relation rel) { - Oid relfilenode = rel->rd_node.relNode; - if (PendingWritesInUpperTransactions(relfilenode, GetCurrentSubTransactionId())) - { - /* XXX: maybe we can just flush the data and continue */ - elog(ERROR, "cannot read from index when there is unflushed data in " - "upper transactions"); - } - - FlushWriteStateForRelfilenode(relfilenode, GetCurrentSubTransactionId()); - MemoryContext scanContext = CreateColumnarScanMemoryContext(); MemoryContext oldContext = MemoryContextSwitchTo(scanContext); @@ -563,6 +560,56 @@ columnar_compute_xid_horizon_for_tuples(Relation rel, } +MemoryContext +GetWriteContextForDebug(void) +{ + return ColumnarWriterContext; +} + + +static ColumnarWriteState * +GetWriteState(Relation relation, TupleDesc tupdesc, bool iscopy) +{ + if (ColumnarWriterContext == NULL) + { + ColumnarWriterContext = AllocSetContextCreate(TopMemoryContext, + "Columnar Writers Memory Context", + ALLOCSET_DEFAULT_SIZES); + } + + ColumnarExecLevel *level = ColumnarExecLevelStack; + + if (level->hashtable == NULL) + { + HASHCTL info; + uint32 hashFlags = (HASH_ELEM | HASH_CONTEXT); + memset(&info, 0, sizeof(info)); + info.keysize = sizeof(RelFileNode); + info.entrysize = sizeof(ColumnarWriterEntry); + info.hcxt = ColumnarWriterContext; + + level->hashtable = hash_create("columnar writers", + 64, &info, hashFlags); + } + + bool found; + ColumnarWriterEntry *entry = hash_search( + level->hashtable, &relation->rd_node, HASH_ENTER, &found); + + if (!found) + { + ColumnarOptions columnarOptions = { 0 }; + ReadColumnarOptions(relation->rd_id, &columnarOptions); + + entry->writeState = ColumnarBeginWrite(relation->rd_node, + columnarOptions, + tupdesc); + } + + return entry->writeState; +} + + static void columnar_tuple_insert(Relation relation, TupleTableSlot *slot, CommandId cid, int options, BulkInsertState bistate) @@ -571,9 +618,8 @@ columnar_tuple_insert(Relation relation, TupleTableSlot *slot, CommandId cid, * columnar_init_write_state allocates the write state in a longer * lasting context, so no need to worry about it. */ - ColumnarWriteState *writeState = columnar_init_write_state(relation, - RelationGetDescr(relation), - GetCurrentSubTransactionId()); + ColumnarWriteState *writeState = GetWriteState(relation, RelationGetDescr(relation), false); + MemoryContext oldContext = MemoryContextSwitchTo(ColumnarWritePerTupleContext( writeState)); @@ -613,9 +659,8 @@ static void columnar_multi_insert(Relation relation, TupleTableSlot **slots, int ntuples, CommandId cid, int options, BulkInsertState bistate) { - ColumnarWriteState *writeState = columnar_init_write_state(relation, - RelationGetDescr(relation), - GetCurrentSubTransactionId()); + ColumnarWriteState *writeState = GetWriteState(relation, + RelationGetDescr(relation), true); ColumnarCheckLogicalReplication(relation); @@ -701,8 +746,6 @@ columnar_relation_set_new_filenode(Relation rel, */ if (rel->rd_node.relNode != newrnode->relNode) { - MarkRelfilenodeDropped(rel->rd_node.relNode, GetCurrentSubTransactionId()); - DeleteMetadataRows(rel->rd_node); } @@ -724,8 +767,6 @@ columnar_relation_nontransactional_truncate(Relation rel) { RelFileNode relfilenode = rel->rd_node; - NonTransactionDropWriteState(relfilenode.relNode); - /* Delete old relfilenode metadata */ DeleteMetadataRows(relfilenode); @@ -1687,7 +1728,6 @@ ColumnarXactCallback(XactEvent event, void *arg) case XACT_EVENT_ABORT: case XACT_EVENT_PARALLEL_ABORT: { - DiscardWriteStateForAllRels(GetCurrentSubTransactionId(), 0); break; } @@ -1695,7 +1735,6 @@ ColumnarXactCallback(XactEvent event, void *arg) case XACT_EVENT_PARALLEL_PRE_COMMIT: case XACT_EVENT_PRE_PREPARE: { - FlushWriteStateForAllRels(GetCurrentSubTransactionId(), 0); break; } } @@ -1717,19 +1756,60 @@ ColumnarSubXactCallback(SubXactEvent event, SubTransactionId mySubid, case SUBXACT_EVENT_ABORT_SUB: { - DiscardWriteStateForAllRels(mySubid, parentSubid); break; } case SUBXACT_EVENT_PRE_COMMIT_SUB: { - FlushWriteStateForAllRels(mySubid, parentSubid); break; } } } +static void +ColumnarExecutorStart(QueryDesc *queryDesc, int eflags) +{ + PrevExecutorStartHook(queryDesc, eflags); + + ColumnarExecLevel *level = palloc0(sizeof(ColumnarExecLevel)); + /* initialize hashtable lazily */ + level->next = ColumnarExecLevelStack; + ColumnarExecLevelStack = level; +} + + +static void +ColumnarFlushWriters(HTAB *hashtable) +{ + HASH_SEQ_STATUS status; + ColumnarWriterEntry *entry; + + hash_seq_init(&status, hashtable); + while ((entry = hash_seq_search(&status)) != 0) + { + ColumnarEndWrite(entry->writeState); + } + hash_destroy(hashtable); +} + + +static void +ColumnarExecutorFinish(QueryDesc *queryDesc) +{ + ColumnarExecLevel *level = ColumnarExecLevelStack; + + Assert(level != NULL); + if (level->hashtable != NULL) + { + ColumnarFlushWriters(level->hashtable); + } + ColumnarExecLevelStack = level->next; + + PrevExecutorFinishHook(queryDesc); +} + + void columnar_tableam_init() { @@ -1744,6 +1824,14 @@ columnar_tableam_init() standard_ProcessUtility; ProcessUtility_hook = ColumnarProcessUtility; + PrevExecutorStartHook = ExecutorStart_hook ? + ExecutorStart_hook : standard_ExecutorStart; + ExecutorStart_hook = ColumnarExecutorStart; + + PrevExecutorFinishHook = ExecutorFinish_hook ? + ExecutorFinish_hook : standard_ExecutorFinish; + ExecutorFinish_hook = ColumnarExecutorFinish; + columnar_customscan_init(); TTSOpsColumnar = TTSOpsVirtual; @@ -1824,8 +1912,6 @@ ColumnarTableDropHook(Oid relid) DeleteMetadataRows(relfilenode); DeleteColumnarTableOptions(rel->rd_id, true); - MarkRelfilenodeDropped(relfilenode.relNode, GetCurrentSubTransactionId()); - /* keep the lock since we did physical changes to the relation */ table_close(rel, NoLock); } diff --git a/src/backend/columnar/write_state_management.c b/src/backend/columnar/write_state_management.c deleted file mode 100644 index 69860ad57..000000000 --- a/src/backend/columnar/write_state_management.c +++ /dev/null @@ -1,388 +0,0 @@ - -#include "citus_version.h" - -#include "postgres.h" -#include "columnar/columnar.h" - - -#include - -#include "miscadmin.h" - -#include "access/genam.h" -#include "access/heapam.h" -#include "access/multixact.h" -#include "access/rewriteheap.h" -#include "access/tsmapi.h" -#if PG_VERSION_NUM >= 130000 -#include "access/heaptoast.h" -#else -#include "access/tuptoaster.h" -#endif -#include "access/xact.h" -#include "catalog/catalog.h" -#include "catalog/index.h" -#include "catalog/objectaccess.h" -#include "catalog/pg_am.h" -#include "catalog/pg_trigger.h" -#include "catalog/storage.h" -#include "catalog/storage_xlog.h" -#include "commands/progress.h" -#include "commands/vacuum.h" -#include "executor/executor.h" -#include "nodes/makefuncs.h" -#include "optimizer/plancat.h" -#include "pgstat.h" -#include "storage/bufmgr.h" -#include "storage/bufpage.h" -#include "storage/bufmgr.h" -#include "storage/lmgr.h" -#include "storage/predicate.h" -#include "storage/procarray.h" -#include "storage/smgr.h" -#include "tcop/utility.h" -#include "utils/builtins.h" -#include "utils/pg_rusage.h" -#include "utils/rel.h" -#include "utils/syscache.h" - -#include "columnar/columnar_customscan.h" -#include "columnar/columnar_tableam.h" -#include "columnar/columnar_version_compat.h" - - -/* - * Mapping from relfilenode to WriteStateMapEntry. This keeps write state for - * each relation. - */ -static HTAB *WriteStateMap = NULL; - -/* memory context for allocating WriteStateMap & all write states */ -static MemoryContext WriteStateContext = NULL; - -/* - * Each member of the writeStateStack in WriteStateMapEntry. This means that - * we did some inserts in the subtransaction subXid, and the state of those - * inserts is stored at writeState. Those writes can be flushed or unflushed. - */ -typedef struct SubXidWriteState -{ - SubTransactionId subXid; - ColumnarWriteState *writeState; - - struct SubXidWriteState *next; -} SubXidWriteState; - - -/* - * An entry in WriteStateMap. - */ -typedef struct WriteStateMapEntry -{ - /* key of the entry */ - Oid relfilenode; - - /* - * If a table is dropped, we set dropped to true and set dropSubXid to the - * id of the subtransaction in which the drop happened. - */ - bool dropped; - SubTransactionId dropSubXid; - - /* - * Stack of SubXidWriteState where first element is top of the stack. When - * inserts happen, we look at top of the stack. If top of stack belongs to - * current subtransaction, we forward writes to its writeState. Otherwise, - * we create a new stack entry for current subtransaction and push it to - * the stack, and forward writes to that. - */ - SubXidWriteState *writeStateStack; -} WriteStateMapEntry; - - -/* - * Memory context reset callback so we reset WriteStateMap to NULL at the end - * of transaction. WriteStateMap is allocated in & WriteStateMap, so its - * leaked reference can cause memory issues. - */ -static MemoryContextCallback cleanupCallback; -static void -CleanupWriteStateMap(void *arg) -{ - WriteStateMap = NULL; - WriteStateContext = NULL; -} - - -ColumnarWriteState * -columnar_init_write_state(Relation relation, TupleDesc tupdesc, - SubTransactionId currentSubXid) -{ - bool found; - - /* - * If this is the first call in current transaction, allocate the hash - * table. - */ - if (WriteStateMap == NULL) - { - WriteStateContext = - AllocSetContextCreate( - TopTransactionContext, - "Column Store Write State Management Context", - ALLOCSET_DEFAULT_SIZES); - HASHCTL info; - uint32 hashFlags = (HASH_ELEM | HASH_CONTEXT); - memset(&info, 0, sizeof(info)); - info.keysize = sizeof(Oid); - info.entrysize = sizeof(WriteStateMapEntry); - info.hcxt = WriteStateContext; - - WriteStateMap = hash_create("column store write state map", - 64, &info, hashFlags); - - cleanupCallback.arg = NULL; - cleanupCallback.func = &CleanupWriteStateMap; - cleanupCallback.next = NULL; - MemoryContextRegisterResetCallback(WriteStateContext, &cleanupCallback); - } - - WriteStateMapEntry *hashEntry = hash_search(WriteStateMap, &relation->rd_node.relNode, - HASH_ENTER, &found); - if (!found) - { - hashEntry->writeStateStack = NULL; - hashEntry->dropped = false; - } - - Assert(!hashEntry->dropped); - - /* - * If top of stack belongs to the current subtransaction, return its - * writeState, ... - */ - if (hashEntry->writeStateStack != NULL) - { - SubXidWriteState *stackHead = hashEntry->writeStateStack; - - if (stackHead->subXid == currentSubXid) - { - return stackHead->writeState; - } - } - - /* - * ... otherwise we need to create a new stack entry for the current - * subtransaction. - */ - MemoryContext oldContext = MemoryContextSwitchTo(WriteStateContext); - - ColumnarOptions columnarOptions = { 0 }; - ReadColumnarOptions(relation->rd_id, &columnarOptions); - - SubXidWriteState *stackEntry = palloc0(sizeof(SubXidWriteState)); - stackEntry->writeState = ColumnarBeginWrite(relation->rd_node, - columnarOptions, - tupdesc); - stackEntry->subXid = currentSubXid; - stackEntry->next = hashEntry->writeStateStack; - hashEntry->writeStateStack = stackEntry; - - MemoryContextSwitchTo(oldContext); - - return stackEntry->writeState; -} - - -/* - * Flushes pending writes for given relfilenode in the given subtransaction. - */ -void -FlushWriteStateForRelfilenode(Oid relfilenode, SubTransactionId currentSubXid) -{ - if (WriteStateMap == NULL) - { - return; - } - - WriteStateMapEntry *entry = hash_search(WriteStateMap, &relfilenode, HASH_FIND, NULL); - - Assert(!entry || !entry->dropped); - - if (entry && entry->writeStateStack != NULL) - { - SubXidWriteState *stackEntry = entry->writeStateStack; - if (stackEntry->subXid == currentSubXid) - { - ColumnarFlushPendingWrites(stackEntry->writeState); - } - } -} - - -/* - * Helper function for FlushWriteStateForAllRels and DiscardWriteStateForAllRels. - * Pops all of write states for current subtransaction, and depending on "commit" - * either flushes them or discards them. This also takes into account dropped - * tables, and either propagates the dropped flag to parent subtransaction or - * rolls back abort. - */ -static void -PopWriteStateForAllRels(SubTransactionId currentSubXid, SubTransactionId parentSubXid, - bool commit) -{ - HASH_SEQ_STATUS status; - WriteStateMapEntry *entry; - - if (WriteStateMap == NULL) - { - return; - } - - hash_seq_init(&status, WriteStateMap); - while ((entry = hash_seq_search(&status)) != 0) - { - if (entry->writeStateStack == NULL) - { - continue; - } - - /* - * If the table has been dropped in current subtransaction, either - * commit the drop or roll it back. - */ - if (entry->dropped) - { - if (entry->dropSubXid == currentSubXid) - { - if (commit) - { - /* elevate drop to the upper subtransaction */ - entry->dropSubXid = parentSubXid; - } - else - { - /* abort the drop */ - entry->dropped = false; - } - } - } - - /* - * Otherwise, commit or discard pending writes. - */ - else - { - SubXidWriteState *stackHead = entry->writeStateStack; - if (stackHead->subXid == currentSubXid) - { - if (commit) - { - ColumnarEndWrite(stackHead->writeState); - } - - entry->writeStateStack = stackHead->next; - } - } - } -} - - -/* - * Called when current subtransaction is committed. - */ -void -FlushWriteStateForAllRels(SubTransactionId currentSubXid, SubTransactionId parentSubXid) -{ - PopWriteStateForAllRels(currentSubXid, parentSubXid, true); -} - - -/* - * Called when current subtransaction is aborted. - */ -void -DiscardWriteStateForAllRels(SubTransactionId currentSubXid, SubTransactionId parentSubXid) -{ - PopWriteStateForAllRels(currentSubXid, parentSubXid, false); -} - - -/* - * Called when the given relfilenode is dropped. - */ -void -MarkRelfilenodeDropped(Oid relfilenode, SubTransactionId currentSubXid) -{ - if (WriteStateMap == NULL) - { - return; - } - - WriteStateMapEntry *entry = hash_search(WriteStateMap, &relfilenode, HASH_FIND, - NULL); - if (!entry || entry->dropped) - { - return; - } - - entry->dropped = true; - entry->dropSubXid = currentSubXid; -} - - -/* - * Called when the given relfilenode is dropped in non-transactional TRUNCATE. - */ -void -NonTransactionDropWriteState(Oid relfilenode) -{ - if (WriteStateMap) - { - hash_search(WriteStateMap, &relfilenode, HASH_REMOVE, false); - } -} - - -/* - * Returns true if there are any pending writes in upper transactions. - */ -bool -PendingWritesInUpperTransactions(Oid relfilenode, SubTransactionId currentSubXid) -{ - if (WriteStateMap == NULL) - { - return false; - } - - WriteStateMapEntry *entry = hash_search(WriteStateMap, &relfilenode, HASH_FIND, NULL); - - if (entry && entry->writeStateStack != NULL) - { - SubXidWriteState *stackEntry = entry->writeStateStack; - - while (stackEntry != NULL) - { - if (stackEntry->subXid != currentSubXid && - ContainsPendingWrites(stackEntry->writeState)) - { - return true; - } - - stackEntry = stackEntry->next; - } - } - - return false; -} - - -/* - * GetWriteContextForDebug exposes WriteStateContext for debugging - * purposes. - */ -extern MemoryContext -GetWriteContextForDebug(void) -{ - return WriteStateContext; -} diff --git a/src/include/columnar/columnar.h b/src/include/columnar/columnar.h index a44072ffe..2bafe3c65 100644 --- a/src/include/columnar/columnar.h +++ b/src/include/columnar/columnar.h @@ -265,21 +265,6 @@ extern StripeMetadata * FindStripeWithHighestRowNumber(Relation relation, Snapshot snapshot); extern Datum columnar_relation_storageid(PG_FUNCTION_ARGS); - -/* write_state_management.c */ -extern ColumnarWriteState * columnar_init_write_state(Relation relation, TupleDesc - tupdesc, - SubTransactionId currentSubXid); -extern void FlushWriteStateForRelfilenode(Oid relfilenode, SubTransactionId - currentSubXid); -extern void FlushWriteStateForAllRels(SubTransactionId currentSubXid, SubTransactionId - parentSubXid); -extern void DiscardWriteStateForAllRels(SubTransactionId currentSubXid, SubTransactionId - parentSubXid); -extern void MarkRelfilenodeDropped(Oid relfilenode, SubTransactionId currentSubXid); -extern void NonTransactionDropWriteState(Oid relfilenode); -extern bool PendingWritesInUpperTransactions(Oid relfilenode, - SubTransactionId currentSubXid); extern MemoryContext GetWriteContextForDebug(void);