From caabbf4b846c869bb1da662a157560f29a8aa447 Mon Sep 17 00:00:00 2001 From: Nils Dijk Date: Tue, 18 Aug 2020 22:55:56 +0200 Subject: [PATCH] Table access method support for distributed tables --- .../commands/create_distributed_table.c | 77 +-- .../distributed/deparser/citus_ruleutils.c | 22 + src/backend/distributed/metadata/dependency.c | 13 + src/backend/distributed/test/blackhole_am.c | 504 --------------- src/backend/distributed/test/fake_am.c | 591 ++++++++++++++++++ src/include/distributed/version_compat.h | 22 + src/test/regress/bin/normalize.sed | 4 + src/test/regress/expected/pg12.out | 16 +- src/test/regress/expected/tableam.out | 323 ++++++++++ src/test/regress/expected/tableam_0.out | 6 + src/test/regress/multi_schedule | 1 + src/test/regress/sql/pg12.sql | 11 - src/test/regress/sql/tableam.sql | 149 +++++ 13 files changed, 1147 insertions(+), 592 deletions(-) delete mode 100644 src/backend/distributed/test/blackhole_am.c create mode 100644 src/backend/distributed/test/fake_am.c create mode 100644 src/test/regress/expected/tableam.out create mode 100644 src/test/regress/expected/tableam_0.out create mode 100644 src/test/regress/sql/tableam.sql diff --git a/src/backend/distributed/commands/create_distributed_table.c b/src/backend/distributed/commands/create_distributed_table.c index 2ec094f0d..c8f3ac587 100644 --- a/src/backend/distributed/commands/create_distributed_table.c +++ b/src/backend/distributed/commands/create_distributed_table.c @@ -119,7 +119,6 @@ static List * TupleDescColumnNameList(TupleDesc tupleDescriptor); static bool RelationUsesIdentityColumns(TupleDesc relationDesc); static bool DistributionColumnUsesGeneratedStoredColumn(TupleDesc relationDesc, Var *distributionColumn); -static bool RelationUsesHeapAccessMethodOrNone(Relation relation); static bool CanUseExclusiveConnections(Oid relationId, bool localTableEmpty); static void DoCopyFromLocalTableIntoShards(Relation distributedRelation, DestReceiver *copyDest, @@ -156,16 +155,6 @@ master_create_distributed_table(PG_FUNCTION_ARGS) char *colocateWithTableName = NULL; bool viaDeprecatedAPI = true; - ObjectAddress tableAddress = { 0 }; - - /* - * distributed tables might have dependencies on different objects, since we create - * shards for a distributed table via multiple sessions these objects will be created - * via their own connection and committed immediately so they become visible to all - * sessions creating shards. - */ - ObjectAddressSet(tableAddress, RelationRelationId, relationId); - EnsureDependenciesExistOnAllNodes(&tableAddress); /* * Lock target relation with an exclusive lock - there's no way to make @@ -203,8 +192,6 @@ master_create_distributed_table(PG_FUNCTION_ARGS) Datum create_distributed_table(PG_FUNCTION_ARGS) { - ObjectAddress tableAddress = { 0 }; - bool viaDeprecatedAPI = false; Oid relationId = PG_GETARG_OID(0); @@ -216,15 +203,6 @@ create_distributed_table(PG_FUNCTION_ARGS) EnsureCitusTableCanBeCreated(relationId); - /* - * distributed tables might have dependencies on different objects, since we create - * shards for a distributed table via multiple sessions these objects will be created - * via their own connection and committed immediately so they become visible to all - * sessions creating shards. - */ - ObjectAddressSet(tableAddress, RelationRelationId, relationId); - EnsureDependenciesExistOnAllNodes(&tableAddress); - /* * Lock target relation with an exclusive lock - there's no way to make * sense of this table until we've committed, and we don't want multiple @@ -267,7 +245,6 @@ create_reference_table(PG_FUNCTION_ARGS) char *colocateWithTableName = NULL; Var *distributionColumn = NULL; - ObjectAddress tableAddress = { 0 }; bool viaDeprecatedAPI = false; @@ -275,15 +252,6 @@ create_reference_table(PG_FUNCTION_ARGS) EnsureCitusTableCanBeCreated(relationId); - /* - * distributed tables might have dependencies on different objects, since we create - * shards for a distributed table via multiple sessions these objects will be created - * via their own connection and committed immediately so they become visible to all - * sessions creating shards. - */ - ObjectAddressSet(tableAddress, RelationRelationId, relationId); - EnsureDependenciesExistOnAllNodes(&tableAddress); - /* * Lock target relation with an exclusive lock - there's no way to make * sense of this table until we've committed, and we don't want multiple @@ -387,6 +355,16 @@ void CreateDistributedTable(Oid relationId, Var *distributionColumn, char distributionMethod, char *colocateWithTableName, bool viaDeprecatedAPI) { + /* + * distributed tables might have dependencies on different objects, since we create + * shards for a distributed table via multiple sessions these objects will be created + * via their own connection and committed immediately so they become visible to all + * sessions creating shards. + */ + ObjectAddress tableAddress = { 0 }; + ObjectAddressSet(tableAddress, RelationRelationId, relationId); + EnsureDependenciesExistOnAllNodes(&tableAddress); + char replicationModel = DecideReplicationModel(distributionMethod, viaDeprecatedAPI); @@ -704,12 +682,6 @@ EnsureRelationCanBeDistributed(Oid relationId, Var *distributionColumn, ErrorIfTableIsACatalogTable(relation); - if (!RelationUsesHeapAccessMethodOrNone(relation)) - { - ereport(ERROR, (errmsg( - "cannot distribute relations using non-heap access methods"))); - } - #if PG_VERSION_NUM < PG_VERSION_12 /* verify target relation does not use WITH (OIDS) PostgreSQL feature */ @@ -1376,8 +1348,7 @@ CopyLocalDataIntoShards(Oid distributedRelationId) /* get the table columns */ TupleDesc tupleDescriptor = RelationGetDescr(distributedRelation); - TupleTableSlot *slot = MakeSingleTupleTableSlotCompat(tupleDescriptor, - &TTSOpsHeapTuple); + TupleTableSlot *slot = CreateTableSlotForRel(distributedRelation); List *columnNameList = TupleDescColumnNameList(tupleDescriptor); int partitionColumnIndex = INVALID_PARTITION_COLUMN_INDEX; @@ -1441,15 +1412,9 @@ DoCopyFromLocalTableIntoShards(Relation distributedRelation, MemoryContext oldContext = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate)); uint64 rowsCopied = 0; - HeapTuple tuple = NULL; - while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL) + while (table_scan_getnextslot(scan, ForwardScanDirection, slot)) { - /* materialize tuple and send it to a shard */ -#if PG_VERSION_NUM >= PG_VERSION_12 - ExecStoreHeapTuple(tuple, slot, false); -#else - ExecStoreTuple(tuple, slot, InvalidBuffer, false); -#endif + /* send tuple it to a shard */ copyDest->receiveSlot(slot, copyDest); /* clear tuple memory */ @@ -1572,22 +1537,6 @@ DistributionColumnUsesGeneratedStoredColumn(TupleDesc relationDesc, } -/* - * Returns whether given relation uses default access method - */ -static bool -RelationUsesHeapAccessMethodOrNone(Relation relation) -{ -#if PG_VERSION_NUM >= PG_VERSION_12 - - return relation->rd_rel->relkind != RELKIND_RELATION || - relation->rd_amhandler == HEAP_TABLE_AM_HANDLER_OID; -#else - return true; -#endif -} - - /* * UndistributeTable undistributes the given table. The undistribution is done by * creating a new table, moving everything to the new table and dropping the old one. diff --git a/src/backend/distributed/deparser/citus_ruleutils.c b/src/backend/distributed/deparser/citus_ruleutils.c index e053890cb..73efcd67d 100644 --- a/src/backend/distributed/deparser/citus_ruleutils.c +++ b/src/backend/distributed/deparser/citus_ruleutils.c @@ -26,6 +26,7 @@ #include "catalog/dependency.h" #include "catalog/indexing.h" #include "catalog/namespace.h" +#include "catalog/pg_am.h" #include "catalog/pg_attribute.h" #include "catalog/pg_authid.h" #include "catalog/pg_class.h" @@ -451,6 +452,27 @@ pg_get_tableschemadef_string(Oid tableRelationId, bool includeSequenceDefaults) appendStringInfo(&buffer, " PARTITION BY %s ", partitioningInformation); } +#if PG_VERSION_NUM >= 120000 + + /* + * Add table access methods for pg12 and higher when the table is configured with an + * access method + */ + if (OidIsValid(relation->rd_rel->relam)) + { + HeapTuple amTup = SearchSysCache1(AMOID, ObjectIdGetDatum( + relation->rd_rel->relam)); + if (!HeapTupleIsValid(amTup)) + { + elog(ERROR, "cache lookup failed for access method %u", + relation->rd_rel->relam); + } + Form_pg_am amForm = (Form_pg_am) GETSTRUCT(amTup); + appendStringInfo(&buffer, " USING %s", quote_identifier(NameStr(amForm->amname))); + ReleaseSysCache(amTup); + } +#endif + /* * Add any reloptions (storage parameters) defined on the table in a WITH * clause. diff --git a/src/backend/distributed/metadata/dependency.c b/src/backend/distributed/metadata/dependency.c index 8d97df9ac..6dacf2cc3 100644 --- a/src/backend/distributed/metadata/dependency.c +++ b/src/backend/distributed/metadata/dependency.c @@ -546,6 +546,19 @@ SupportedDependencyByCitus(const ObjectAddress *address) */ switch (getObjectClass(address)) { +#if PG_VERSION_NUM >= 120000 + case OCLASS_AM: + { + /* + * Only support access methods if they came from extensions + * During the dependency resolution it will cascade into the extension and + * distributed that one instead of the Access Method. Now access methods can + * be configured on tables on the workers. + */ + return IsObjectAddressOwnedByExtension(address, NULL); + } +#endif + case OCLASS_COLLATION: case OCLASS_SCHEMA: { diff --git a/src/backend/distributed/test/blackhole_am.c b/src/backend/distributed/test/blackhole_am.c deleted file mode 100644 index 02bef7bd6..000000000 --- a/src/backend/distributed/test/blackhole_am.c +++ /dev/null @@ -1,504 +0,0 @@ -/*------------------------------------------------------------------------- - * - * blackhole_am.c - * blackhole table access method code - * - * Portions Copyright (c) 1996-PostgreSQL Global Development Group - * Portions Copyright (c) 1994, Regents of the University of California - * - * - * IDENTIFICATION - * Copied from https://github.com/michaelpq/pg_plugins/blob/master/blackhole_am/blackhole_am.c - * - * - * NOTES - * This file introduces the table access method blackhole, which can - * be used as a template for other table access methods, and guarantees - * that any data inserted into it gets sent to the void. - * - *------------------------------------------------------------------------- - */ - -/* *INDENT-OFF* */ -#include "postgres.h" - -#include "distributed/pg_version_constants.h" - -#if PG_VERSION_NUM >= PG_VERSION_12 - -#include "access/tableam.h" -#include "access/heapam.h" -#include "access/amapi.h" -#include "catalog/index.h" -#include "commands/vacuum.h" -#include "executor/tuptable.h" - -PG_FUNCTION_INFO_V1(blackhole_am_handler); - -/* Base structures for scans */ -typedef struct BlackholeScanDescData -{ - TableScanDescData rs_base; /* AM independent part of the descriptor */ - - /* Add more fields here as needed by the AM. */ -} BlackholeScanDescData; -typedef struct BlackholeScanDescData *BlackholeScanDesc; - -static const TableAmRoutine blackhole_methods; - - -/* ------------------------------------------------------------------------ - * Slot related callbacks for blackhole AM - * ------------------------------------------------------------------------ - */ - -static const TupleTableSlotOps * -blackhole_slot_callbacks(Relation relation) -{ - /* - * Here you would most likely want to invent your own set of - * slot callbacks for your AM. - */ - return &TTSOpsMinimalTuple; -} - -/* ------------------------------------------------------------------------ - * Table Scan Callbacks for blackhole AM - * ------------------------------------------------------------------------ - */ - -static TableScanDesc -blackhole_scan_begin(Relation relation, Snapshot snapshot, - int nkeys, ScanKey key, - ParallelTableScanDesc parallel_scan, - uint32 flags) -{ - BlackholeScanDesc scan; - - scan = (BlackholeScanDesc) palloc(sizeof(BlackholeScanDescData)); - - scan->rs_base.rs_rd = relation; - scan->rs_base.rs_snapshot = snapshot; - scan->rs_base.rs_nkeys = nkeys; - scan->rs_base.rs_flags = flags; - scan->rs_base.rs_parallel = parallel_scan; - - return (TableScanDesc) scan; } - -static void -blackhole_scan_end(TableScanDesc sscan) -{ - BlackholeScanDesc scan = (BlackholeScanDesc) sscan; - pfree(scan); -} - -static void -blackhole_scan_rescan(TableScanDesc sscan, ScanKey key, bool set_params, - bool allow_strat, bool allow_sync, bool allow_pagemode) -{ - /* nothing to do */ -} - -static bool -blackhole_scan_getnextslot(TableScanDesc sscan, ScanDirection direction, - TupleTableSlot *slot) -{ - /* nothing to do */ - return false; -} - -/* ------------------------------------------------------------------------ - * Index Scan Callbacks for blackhole AM - * ------------------------------------------------------------------------ - */ - -static IndexFetchTableData * -blackhole_index_fetch_begin(Relation rel) -{ - return NULL; -} - -static void -blackhole_index_fetch_reset(IndexFetchTableData *scan) -{ - /* nothing to do here */ -} - -static void -blackhole_index_fetch_end(IndexFetchTableData *scan) -{ - /* nothing to do here */ -} - -static bool -blackhole_index_fetch_tuple(struct IndexFetchTableData *scan, - ItemPointer tid, - Snapshot snapshot, - TupleTableSlot *slot, - bool *call_again, bool *all_dead) -{ - /* there is no data */ - return 0; -} - - -/* ------------------------------------------------------------------------ - * Callbacks for non-modifying operations on individual tuples for - * blackhole AM. - * ------------------------------------------------------------------------ - */ - -static bool -blackhole_fetch_row_version(Relation relation, - ItemPointer tid, - Snapshot snapshot, - TupleTableSlot *slot) -{ - /* nothing to do */ - return false; -} - -static void -blackhole_get_latest_tid(TableScanDesc sscan, - ItemPointer tid) -{ - /* nothing to do */ -} - -static bool -blackhole_tuple_tid_valid(TableScanDesc scan, ItemPointer tid) -{ - return false; -} - -static bool -blackhole_tuple_satisfies_snapshot(Relation rel, TupleTableSlot *slot, - Snapshot snapshot) -{ - return false; -} - -static TransactionId -blackhole_compute_xid_horizon_for_tuples(Relation rel, - ItemPointerData *tids, - int nitems) -{ - return InvalidTransactionId; -} - -/* ---------------------------------------------------------------------------- - * Functions for manipulations of physical tuples for blackhole AM. - * ---------------------------------------------------------------------------- - */ - -static void -blackhole_tuple_insert(Relation relation, TupleTableSlot *slot, - CommandId cid, int options, BulkInsertState bistate) -{ - /* nothing to do */ -} - -static void -blackhole_tuple_insert_speculative(Relation relation, TupleTableSlot *slot, - CommandId cid, int options, - BulkInsertState bistate, - uint32 specToken) -{ - /* nothing to do */ -} - -static void -blackhole_tuple_complete_speculative(Relation relation, TupleTableSlot *slot, - uint32 spekToken, bool succeeded) -{ - /* nothing to do */ -} - -static void -blackhole_multi_insert(Relation relation, TupleTableSlot **slots, - int ntuples, CommandId cid, int options, - BulkInsertState bistate) -{ - /* nothing to do */ -} - -static TM_Result -blackhole_tuple_delete(Relation relation, ItemPointer tid, CommandId cid, - Snapshot snapshot, Snapshot crosscheck, bool wait, - TM_FailureData *tmfd, bool changingPart) -{ - /* nothing to do, so it is always OK */ - return TM_Ok; -} - - -static TM_Result -blackhole_tuple_update(Relation relation, ItemPointer otid, - TupleTableSlot *slot, CommandId cid, - Snapshot snapshot, Snapshot crosscheck, - bool wait, TM_FailureData *tmfd, - LockTupleMode *lockmode, bool *update_indexes) -{ - /* nothing to do, so it is always OK */ - return TM_Ok; -} - -static TM_Result -blackhole_tuple_lock(Relation relation, ItemPointer tid, Snapshot snapshot, - TupleTableSlot *slot, CommandId cid, LockTupleMode mode, - LockWaitPolicy wait_policy, uint8 flags, - TM_FailureData *tmfd) -{ - /* nothing to do, so it is always OK */ - return TM_Ok; -} - -static void -blackhole_finish_bulk_insert(Relation relation, int options) -{ - /* nothing to do */ -} - - -/* ------------------------------------------------------------------------ - * DDL related callbacks for blackhole AM. - * ------------------------------------------------------------------------ - */ - -static void -blackhole_relation_set_new_filenode(Relation rel, - const RelFileNode *newrnode, - char persistence, - TransactionId *freezeXid, - MultiXactId *minmulti) -{ - /* nothing to do */ -} - -static void -blackhole_relation_nontransactional_truncate(Relation rel) -{ - /* nothing to do */ -} - -static void -blackhole_copy_data(Relation rel, const RelFileNode *newrnode) -{ - /* there is no data */ -} - -static void -blackhole_copy_for_cluster(Relation OldTable, Relation NewTable, - Relation OldIndex, bool use_sort, - TransactionId OldestXmin, - TransactionId *xid_cutoff, - MultiXactId *multi_cutoff, - double *num_tuples, - double *tups_vacuumed, - double *tups_recently_dead) -{ - /* no data, so nothing to do */ -} - -static void -blackhole_vacuum(Relation onerel, VacuumParams *params, - BufferAccessStrategy bstrategy) -{ - /* no data, so nothing to do */ -} - -static bool -blackhole_scan_analyze_next_block(TableScanDesc scan, BlockNumber blockno, - BufferAccessStrategy bstrategy) -{ - /* no data, so no point to analyze next block */ - return false; -} - -static bool -blackhole_scan_analyze_next_tuple(TableScanDesc scan, TransactionId OldestXmin, - double *liverows, double *deadrows, - TupleTableSlot *slot) -{ - /* no data, so no point to analyze next tuple */ - return false; -} - -static double -blackhole_index_build_range_scan(Relation tableRelation, - Relation indexRelation, - IndexInfo *indexInfo, - bool allow_sync, - bool anyvisible, - bool progress, - BlockNumber start_blockno, - BlockNumber numblocks, - IndexBuildCallback callback, - void *callback_state, - TableScanDesc scan) -{ - /* no data, so no tuples */ - return 0; -} - -static void -blackhole_index_validate_scan(Relation tableRelation, - Relation indexRelation, - IndexInfo *indexInfo, - Snapshot snapshot, - ValidateIndexState *state) -{ - /* nothing to do */ -} - - -/* ------------------------------------------------------------------------ - * Miscellaneous callbacks for the blackhole AM - * ------------------------------------------------------------------------ - */ - -static uint64 -blackhole_relation_size(Relation rel, ForkNumber forkNumber) -{ - /* there is nothing */ - return 0; -} - -/* - * Check to see whether the table needs a TOAST table. - */ -static bool -blackhole_relation_needs_toast_table(Relation rel) -{ - /* no data, so no toast table needed */ - return false; -} - - -/* ------------------------------------------------------------------------ - * Planner related callbacks for the blackhole AM - * ------------------------------------------------------------------------ - */ - -static void -blackhole_estimate_rel_size(Relation rel, int32 *attr_widths, - BlockNumber *pages, double *tuples, - double *allvisfrac) -{ - /* no data available */ - *attr_widths = 0; - *tuples = 0; - *allvisfrac = 0; - *pages = 0; -} - - -/* ------------------------------------------------------------------------ - * Executor related callbacks for the blackhole AM - * ------------------------------------------------------------------------ - */ - -static bool -blackhole_scan_bitmap_next_block(TableScanDesc scan, - TBMIterateResult *tbmres) -{ - /* no data, so no point to scan next block */ - return false; -} - -static bool -blackhole_scan_bitmap_next_tuple(TableScanDesc scan, - TBMIterateResult *tbmres, - TupleTableSlot *slot) -{ - /* no data, so no point to scan next tuple */ - return false; -} - -static bool -blackhole_scan_sample_next_block(TableScanDesc scan, - SampleScanState *scanstate) -{ - /* no data, so no point to scan next block for sampling */ - return false; -} - -static bool -blackhole_scan_sample_next_tuple(TableScanDesc scan, - SampleScanState *scanstate, - TupleTableSlot *slot) -{ - /* no data, so no point to scan next tuple for sampling */ - return false; -} - - -/* ------------------------------------------------------------------------ - * Definition of the blackhole table access method. - * ------------------------------------------------------------------------ - */ - -static const TableAmRoutine blackhole_methods = { - .type = T_TableAmRoutine, - - .slot_callbacks = blackhole_slot_callbacks, - - .scan_begin = blackhole_scan_begin, - .scan_end = blackhole_scan_end, - .scan_rescan = blackhole_scan_rescan, - .scan_getnextslot = blackhole_scan_getnextslot, - - /* these are common helper functions */ - .parallelscan_estimate = table_block_parallelscan_estimate, - .parallelscan_initialize = table_block_parallelscan_initialize, - .parallelscan_reinitialize = table_block_parallelscan_reinitialize, - - .index_fetch_begin = blackhole_index_fetch_begin, - .index_fetch_reset = blackhole_index_fetch_reset, - .index_fetch_end = blackhole_index_fetch_end, - .index_fetch_tuple = blackhole_index_fetch_tuple, - - .tuple_insert = blackhole_tuple_insert, - .tuple_insert_speculative = blackhole_tuple_insert_speculative, - .tuple_complete_speculative = blackhole_tuple_complete_speculative, - .multi_insert = blackhole_multi_insert, - .tuple_delete = blackhole_tuple_delete, - .tuple_update = blackhole_tuple_update, - .tuple_lock = blackhole_tuple_lock, - .finish_bulk_insert = blackhole_finish_bulk_insert, - - .tuple_fetch_row_version = blackhole_fetch_row_version, - .tuple_get_latest_tid = blackhole_get_latest_tid, - .tuple_tid_valid = blackhole_tuple_tid_valid, - .tuple_satisfies_snapshot = blackhole_tuple_satisfies_snapshot, - .compute_xid_horizon_for_tuples = blackhole_compute_xid_horizon_for_tuples, - - .relation_set_new_filenode = blackhole_relation_set_new_filenode, - .relation_nontransactional_truncate = blackhole_relation_nontransactional_truncate, - .relation_copy_data = blackhole_copy_data, - .relation_copy_for_cluster = blackhole_copy_for_cluster, - .relation_vacuum = blackhole_vacuum, - .scan_analyze_next_block = blackhole_scan_analyze_next_block, - .scan_analyze_next_tuple = blackhole_scan_analyze_next_tuple, - .index_build_range_scan = blackhole_index_build_range_scan, - .index_validate_scan = blackhole_index_validate_scan, - - .relation_size = blackhole_relation_size, - .relation_needs_toast_table = blackhole_relation_needs_toast_table, - - .relation_estimate_size = blackhole_estimate_rel_size, - - .scan_bitmap_next_block = blackhole_scan_bitmap_next_block, - .scan_bitmap_next_tuple = blackhole_scan_bitmap_next_tuple, - .scan_sample_next_block = blackhole_scan_sample_next_block, - .scan_sample_next_tuple = blackhole_scan_sample_next_tuple -}; - - -Datum -blackhole_am_handler(PG_FUNCTION_ARGS) -{ - PG_RETURN_POINTER(&blackhole_methods); -} - -#endif diff --git a/src/backend/distributed/test/fake_am.c b/src/backend/distributed/test/fake_am.c new file mode 100644 index 000000000..2d743d36c --- /dev/null +++ b/src/backend/distributed/test/fake_am.c @@ -0,0 +1,591 @@ +/*------------------------------------------------------------------------- + * + * fake_am.c + * fake table access method code + * + * Copyright (c) Citus Data, Inc. + * + * IDENTIFICATION + * Based on https://github.com/michaelpq/pg_plugins/blob/master/blackhole_am/blackhole_am.c + * + * + * NOTES + * This file introduces the table access method "fake", which delegates + * bare minimum functionality for testing to heapam to provide an append + * only access method, and doesn't implement rest of the functionality. + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "distributed/pg_version_constants.h" + +#if PG_VERSION_NUM >= PG_VERSION_12 + +#include "access/amapi.h" +#include "access/heapam.h" +#include "access/tableam.h" +#include "access/multixact.h" +#include "access/xact.h" +#include "catalog/index.h" +#include "catalog/storage.h" +#include "catalog/storage_xlog.h" +#include "commands/vacuum.h" +#include "executor/tuptable.h" +#include "storage/smgr.h" +#include "utils/snapmgr.h" + +PG_FUNCTION_INFO_V1(fake_am_handler); + +static const TableAmRoutine fake_methods; + + +/* ------------------------------------------------------------------------ + * Slot related callbacks for fake AM + * ------------------------------------------------------------------------ + */ +static const TupleTableSlotOps * +fake_slot_callbacks(Relation relation) +{ + return &TTSOpsBufferHeapTuple; +} + + +/* ------------------------------------------------------------------------ + * Table Scan Callbacks for fake AM + * ------------------------------------------------------------------------ + */ +static TableScanDesc +fake_scan_begin(Relation relation, Snapshot snapshot, + int nkeys, ScanKey key, + ParallelTableScanDesc parallel_scan, + uint32 flags) +{ + return heap_beginscan(relation, snapshot, nkeys, key, parallel_scan, flags); +} + + +static void +fake_scan_end(TableScanDesc sscan) +{ + heap_endscan(sscan); +} + + +static void +fake_scan_rescan(TableScanDesc sscan, ScanKey key, bool set_params, + bool allow_strat, bool allow_sync, bool allow_pagemode) +{ + heap_rescan(sscan, key, set_params, allow_strat, allow_sync, allow_pagemode); +} + + +static bool +fake_scan_getnextslot(TableScanDesc sscan, ScanDirection direction, + TupleTableSlot *slot) +{ + ereport(WARNING, (errmsg("fake_scan_getnextslot"))); + return heap_getnextslot(sscan, direction, slot); +} + + +/* ------------------------------------------------------------------------ + * Index Scan Callbacks for fake AM + * ------------------------------------------------------------------------ + */ +static IndexFetchTableData * +fake_index_fetch_begin(Relation rel) +{ + elog(ERROR, "fake_index_fetch_begin not implemented"); + return NULL; +} + + +static void +fake_index_fetch_reset(IndexFetchTableData *scan) +{ + elog(ERROR, "fake_index_fetch_reset not implemented"); +} + + +static void +fake_index_fetch_end(IndexFetchTableData *scan) +{ + elog(ERROR, "fake_index_fetch_end not implemented"); +} + + +static bool +fake_index_fetch_tuple(struct IndexFetchTableData *scan, + ItemPointer tid, + Snapshot snapshot, + TupleTableSlot *slot, + bool *call_again, bool *all_dead) +{ + elog(ERROR, "fake_index_fetch_tuple not implemented"); + return false; +} + + +/* ------------------------------------------------------------------------ + * Callbacks for non-modifying operations on individual tuples for + * fake AM. + * ------------------------------------------------------------------------ + */ +static bool +fake_fetch_row_version(Relation relation, + ItemPointer tid, + Snapshot snapshot, + TupleTableSlot *slot) +{ + elog(ERROR, "fake_fetch_row_version not implemented"); + return false; +} + + +static void +fake_get_latest_tid(TableScanDesc sscan, + ItemPointer tid) +{ + elog(ERROR, "fake_get_latest_tid not implemented"); +} + + +static bool +fake_tuple_tid_valid(TableScanDesc scan, ItemPointer tid) +{ + elog(ERROR, "fake_tuple_tid_valid not implemented"); + return false; +} + + +static bool +fake_tuple_satisfies_snapshot(Relation rel, TupleTableSlot *slot, + Snapshot snapshot) +{ + elog(ERROR, "fake_tuple_satisfies_snapshot not implemented"); + return false; +} + + +static TransactionId +fake_compute_xid_horizon_for_tuples(Relation rel, + ItemPointerData *tids, + int nitems) +{ + elog(ERROR, "fake_compute_xid_horizon_for_tuples not implemented"); + return InvalidTransactionId; +} + + +/* ---------------------------------------------------------------------------- + * Functions for manipulations of physical tuples for fake AM. + * ---------------------------------------------------------------------------- + */ +static void +fake_tuple_insert(Relation relation, TupleTableSlot *slot, + CommandId cid, int options, BulkInsertState bistate) +{ + ereport(WARNING, (errmsg("fake_tuple_insert"))); + + /* + * Code below this point is from heapam_tuple_insert from + * heapam_handler.c + */ + + bool shouldFree = true; + HeapTuple tuple = ExecFetchSlotHeapTuple(slot, true, &shouldFree); + + /* Update the tuple with table oid */ + slot->tts_tableOid = RelationGetRelid(relation); + tuple->t_tableOid = slot->tts_tableOid; + + /* Perform the insertion, and copy the resulting ItemPointer */ + heap_insert(relation, tuple, cid, options, bistate); + ItemPointerCopy(&tuple->t_self, &slot->tts_tid); + + if (shouldFree) + { + pfree(tuple); + } +} + + +static void +fake_tuple_insert_speculative(Relation relation, TupleTableSlot *slot, + CommandId cid, int options, + BulkInsertState bistate, + uint32 specToken) +{ + elog(ERROR, "fake_tuple_insert_speculative not implemented"); +} + + +static void +fake_tuple_complete_speculative(Relation relation, TupleTableSlot *slot, + uint32 spekToken, bool succeeded) +{ + elog(ERROR, "fake_tuple_complete_speculative not implemented"); +} + + +static void +fake_multi_insert(Relation relation, TupleTableSlot **slots, + int ntuples, CommandId cid, int options, + BulkInsertState bistate) +{ + ereport(WARNING, (errmsg("fake_multi_insert"))); + + heap_multi_insert(relation, slots, ntuples, cid, options, bistate); +} + + +static TM_Result +fake_tuple_delete(Relation relation, ItemPointer tid, CommandId cid, + Snapshot snapshot, Snapshot crosscheck, bool wait, + TM_FailureData *tmfd, bool changingPart) +{ + elog(ERROR, "fake_tuple_delete not implemented"); +} + + +static TM_Result +fake_tuple_update(Relation relation, ItemPointer otid, + TupleTableSlot *slot, CommandId cid, + Snapshot snapshot, Snapshot crosscheck, + bool wait, TM_FailureData *tmfd, + LockTupleMode *lockmode, bool *update_indexes) +{ + elog(ERROR, "fake_tuple_update not implemented"); +} + + +static TM_Result +fake_tuple_lock(Relation relation, ItemPointer tid, Snapshot snapshot, + TupleTableSlot *slot, CommandId cid, LockTupleMode mode, + LockWaitPolicy wait_policy, uint8 flags, + TM_FailureData *tmfd) +{ + elog(ERROR, "fake_tuple_lock not implemented"); +} + + +static void +fake_finish_bulk_insert(Relation relation, int options) +{ + /* nothing to do here */ +} + + +/* ------------------------------------------------------------------------ + * DDL related callbacks for fake AM. + * ------------------------------------------------------------------------ + */ +static void +fake_relation_set_new_filenode(Relation rel, + const RelFileNode *newrnode, + char persistence, + TransactionId *freezeXid, + MultiXactId *minmulti) +{ + /* + * Code below is copied from heapam_relation_set_new_filenode in + * heapam_handler.c. + */ + + + /* + * Initialize to the minimum XID that could put tuples in the table. We + * know that no xacts older than RecentXmin are still running, so that + * will do. + */ + *freezeXid = RecentXmin; + + /* + * Similarly, initialize the minimum Multixact to the first value that + * could possibly be stored in tuples in the table. Running transactions + * could reuse values from their local cache, so we are careful to + * consider all currently running multis. + * + * XXX this could be refined further, but is it worth the hassle? + */ + *minmulti = GetOldestMultiXactId(); + + SMgrRelation srel = RelationCreateStorage(*newrnode, persistence); + + /* + * If required, set up an init fork for an unlogged table so that it can + * be correctly reinitialized on restart. An immediate sync is required + * even if the page has been logged, because the write did not go through + * shared_buffers and therefore a concurrent checkpoint may have moved the + * redo pointer past our xlog record. Recovery may as well remove it + * while replaying, for example, XLOG_DBASE_CREATE or XLOG_TBLSPC_CREATE + * record. Therefore, logging is necessary even if wal_level=minimal. + */ + if (persistence == RELPERSISTENCE_UNLOGGED) + { + Assert(rel->rd_rel->relkind == RELKIND_RELATION || + rel->rd_rel->relkind == RELKIND_MATVIEW || + rel->rd_rel->relkind == RELKIND_TOASTVALUE); + smgrcreate(srel, INIT_FORKNUM, false); + log_smgrcreate(newrnode, INIT_FORKNUM); + smgrimmedsync(srel, INIT_FORKNUM); + } + + smgrclose(srel); +} + + +static void +fake_relation_nontransactional_truncate(Relation rel) +{ + elog(ERROR, "fake_relation_nontransactional_truncate not implemented"); +} + + +static void +fake_copy_data(Relation rel, const RelFileNode *newrnode) +{ + elog(ERROR, "fake_copy_data not implemented"); +} + + +static void +fake_copy_for_cluster(Relation OldTable, Relation NewTable, + Relation OldIndex, bool use_sort, + TransactionId OldestXmin, + TransactionId *xid_cutoff, + MultiXactId *multi_cutoff, + double *num_tuples, + double *tups_vacuumed, + double *tups_recently_dead) +{ + elog(ERROR, "fake_copy_for_cluster not implemented"); +} + + +static void +fake_vacuum(Relation onerel, VacuumParams *params, + BufferAccessStrategy bstrategy) +{ + elog(WARNING, "fake_copy_for_cluster not implemented"); +} + + +static bool +fake_scan_analyze_next_block(TableScanDesc scan, BlockNumber blockno, + BufferAccessStrategy bstrategy) +{ + /* we don't support analyze, so return false */ + return false; +} + + +static bool +fake_scan_analyze_next_tuple(TableScanDesc scan, TransactionId OldestXmin, + double *liverows, double *deadrows, + TupleTableSlot *slot) +{ + elog(ERROR, "fake_scan_analyze_next_tuple not implemented"); +} + + +static double +fake_index_build_range_scan(Relation tableRelation, + Relation indexRelation, + IndexInfo *indexInfo, + bool allow_sync, + bool anyvisible, + bool progress, + BlockNumber start_blockno, + BlockNumber numblocks, + IndexBuildCallback callback, + void *callback_state, + TableScanDesc scan) +{ + elog(ERROR, "fake_index_build_range_scan not implemented"); +} + + +static void +fake_index_validate_scan(Relation tableRelation, + Relation indexRelation, + IndexInfo *indexInfo, + Snapshot snapshot, + ValidateIndexState *state) +{ + elog(ERROR, "fake_index_build_range_scan not implemented"); +} + + +/* ------------------------------------------------------------------------ + * Miscellaneous callbacks for the fake AM + * ------------------------------------------------------------------------ + */ +static uint64 +fake_relation_size(Relation rel, ForkNumber forkNumber) +{ + /* + * Code below is copied from heapam_relation_size from + * heapam_handler.c. + */ + + uint64 nblocks = 0; + + /* Open it at the smgr level if not already done */ + RelationOpenSmgr(rel); + + /* InvalidForkNumber indicates returning the size for all forks */ + if (forkNumber == InvalidForkNumber) + { + for (int i = 0; i < MAX_FORKNUM; i++) + { + nblocks += smgrnblocks(rel->rd_smgr, i); + } + } + else + { + nblocks = smgrnblocks(rel->rd_smgr, forkNumber); + } + + return nblocks * BLCKSZ; +} + + +/* + * Check to see whether the table needs a TOAST table. + */ +static bool +fake_relation_needs_toast_table(Relation rel) +{ + /* we don't test toastable data with this, so no toast table needed */ + return false; +} + + +/* ------------------------------------------------------------------------ + * Planner related callbacks for the fake AM + * ------------------------------------------------------------------------ + */ +static void +fake_estimate_rel_size(Relation rel, int32 *attr_widths, + BlockNumber *pages, double *tuples, + double *allvisfrac) +{ + /* no data available */ + *attr_widths = 0; + *tuples = 0; + *allvisfrac = 0; + *pages = 0; +} + + +/* ------------------------------------------------------------------------ + * Executor related callbacks for the fake AM + * ------------------------------------------------------------------------ + */ +static bool +fake_scan_bitmap_next_block(TableScanDesc scan, + TBMIterateResult *tbmres) +{ + elog(ERROR, "fake_scan_bitmap_next_block not implemented"); +} + + +static bool +fake_scan_bitmap_next_tuple(TableScanDesc scan, + TBMIterateResult *tbmres, + TupleTableSlot *slot) +{ + elog(ERROR, "fake_scan_bitmap_next_tuple not implemented"); +} + + +static bool +fake_scan_sample_next_block(TableScanDesc scan, + SampleScanState *scanstate) +{ + elog(ERROR, "fake_scan_sample_next_block not implemented"); +} + + +static bool +fake_scan_sample_next_tuple(TableScanDesc scan, + SampleScanState *scanstate, + TupleTableSlot *slot) +{ + elog(ERROR, "fake_scan_sample_next_tuple not implemented"); +} + + +/* ------------------------------------------------------------------------ + * Definition of the fake table access method. + * ------------------------------------------------------------------------ + */ + +static const TableAmRoutine fake_methods = { + .type = T_TableAmRoutine, + + .slot_callbacks = fake_slot_callbacks, + + .scan_begin = fake_scan_begin, + .scan_end = fake_scan_end, + .scan_rescan = fake_scan_rescan, + .scan_getnextslot = fake_scan_getnextslot, + + /* these are common helper functions */ + .parallelscan_estimate = table_block_parallelscan_estimate, + .parallelscan_initialize = table_block_parallelscan_initialize, + .parallelscan_reinitialize = table_block_parallelscan_reinitialize, + + .index_fetch_begin = fake_index_fetch_begin, + .index_fetch_reset = fake_index_fetch_reset, + .index_fetch_end = fake_index_fetch_end, + .index_fetch_tuple = fake_index_fetch_tuple, + + .tuple_insert = fake_tuple_insert, + .tuple_insert_speculative = fake_tuple_insert_speculative, + .tuple_complete_speculative = fake_tuple_complete_speculative, + .multi_insert = fake_multi_insert, + .tuple_delete = fake_tuple_delete, + .tuple_update = fake_tuple_update, + .tuple_lock = fake_tuple_lock, + .finish_bulk_insert = fake_finish_bulk_insert, + + .tuple_fetch_row_version = fake_fetch_row_version, + .tuple_get_latest_tid = fake_get_latest_tid, + .tuple_tid_valid = fake_tuple_tid_valid, + .tuple_satisfies_snapshot = fake_tuple_satisfies_snapshot, + .compute_xid_horizon_for_tuples = fake_compute_xid_horizon_for_tuples, + + .relation_set_new_filenode = fake_relation_set_new_filenode, + .relation_nontransactional_truncate = fake_relation_nontransactional_truncate, + .relation_copy_data = fake_copy_data, + .relation_copy_for_cluster = fake_copy_for_cluster, + .relation_vacuum = fake_vacuum, + .scan_analyze_next_block = fake_scan_analyze_next_block, + .scan_analyze_next_tuple = fake_scan_analyze_next_tuple, + .index_build_range_scan = fake_index_build_range_scan, + .index_validate_scan = fake_index_validate_scan, + + .relation_size = fake_relation_size, + .relation_needs_toast_table = fake_relation_needs_toast_table, + + .relation_estimate_size = fake_estimate_rel_size, + + .scan_bitmap_next_block = fake_scan_bitmap_next_block, + .scan_bitmap_next_tuple = fake_scan_bitmap_next_tuple, + .scan_sample_next_block = fake_scan_sample_next_block, + .scan_sample_next_tuple = fake_scan_sample_next_tuple +}; + + +Datum +fake_am_handler(PG_FUNCTION_ARGS) +{ + PG_RETURN_POINTER(&fake_methods); +} + + +#endif diff --git a/src/include/distributed/version_compat.h b/src/include/distributed/version_compat.h index a90f7d056..e59734501 100644 --- a/src/include/distributed/version_compat.h +++ b/src/include/distributed/version_compat.h @@ -15,10 +15,13 @@ #include "distributed/pg_version_constants.h" +#include "access/sdir.h" +#include "access/heapam.h" #include "commands/explain.h" #include "catalog/namespace.h" #include "distributed/citus_ruleutils.h" #include "distributed/citus_safe_lib.h" +#include "executor/tuptable.h" #include "nodes/parsenodes.h" #include "parser/parse_func.h" #if (PG_VERSION_NUM >= PG_VERSION_12) @@ -61,6 +64,7 @@ #endif #if PG_VERSION_NUM >= PG_VERSION_12 +#define CreateTableSlotForRel(rel) table_slot_create(rel, NULL) #define MakeSingleTupleTableSlotCompat MakeSingleTupleTableSlot #define AllocSetContextCreateExtended AllocSetContextCreateInternal #define NextCopyFromCompat NextCopyFrom @@ -122,6 +126,7 @@ FileCompatFromFileStart(File fileDesc) #else /* pre PG12 */ +#define CreateTableSlotForRel(rel) MakeSingleTupleTableSlot(RelationGetDescr(rel)) #define table_open(r, l) heap_open(r, l) #define table_openrv(r, l) heap_openrv(r, l) #define table_openrv_extended(r, l, m) heap_openrv_extended(r, l, m) @@ -184,6 +189,23 @@ FileCompatFromFileStart(File fileDesc) } +/* + * postgres 11 equivalent for a function with the same name in postgres 12+. + */ +static inline bool +table_scan_getnextslot(HeapScanDesc scan, ScanDirection dir, TupleTableSlot *slot) +{ + HeapTuple tuple = heap_getnext(scan, ForwardScanDirection); + if (tuple == NULL) + { + return false; + } + + ExecStoreTuple(tuple, slot, InvalidBuffer, false); + return true; +} + + #endif /* PG12 */ #define fcSetArg(fc, n, value) fcSetArgExt(fc, n, value, false) diff --git a/src/test/regress/bin/normalize.sed b/src/test/regress/bin/normalize.sed index ca75eaf68..f1069faaa 100644 --- a/src/test/regress/bin/normalize.sed +++ b/src/test/regress/bin/normalize.sed @@ -14,6 +14,9 @@ s/shard [0-9]+/shard xxxxx/g s/assigned task [0-9]+ to node/assigned task to node/ s/node group [12] (but|does)/node group \1/ +# discard "USING heap" in "CREATE TABLE ... USING heap" +s/CREATE(.*)TABLE(.*)USING heap/CREATE\1TABLE\2/g + # Differing names can have differing table column widths s/^-[+-]{2,}$/---------------------------------------------------------------------/g @@ -100,6 +103,7 @@ s/_id_ref_id_fkey/_id_fkey/g s/_ref_id_id_fkey_/_ref_id_fkey_/g s/fk_test_2_col1_col2_fkey/fk_test_2_col1_fkey/g s/_id_other_column_ref_fkey/_id_fkey/g +s/"(collections_list_|collection_users_|collection_users_fkey_)[0-9]+"/"\1xxxxxxx"/g # pg13 changes s/of relation ".*" violates not-null constraint/violates not-null constraint/g diff --git a/src/test/regress/expected/pg12.out b/src/test/regress/expected/pg12.out index 65d31a78b..d2b07de95 100644 --- a/src/test/regress/expected/pg12.out +++ b/src/test/regress/expected/pg12.out @@ -10,16 +10,6 @@ SET citus.next_shard_id TO 60000; SET citus.next_placement_id TO 60000; create schema test_pg12; set search_path to test_pg12; -CREATE FUNCTION blackhole_am_handler(internal) -RETURNS table_am_handler -AS 'citus' -LANGUAGE C; -CREATE ACCESS METHOD blackhole_am TYPE TABLE HANDLER blackhole_am_handler; -create table test_am(id int, val int) using blackhole_am; -insert into test_am values (1, 1); --- Custom table access methods should be rejected -select create_distributed_table('test_am','id'); -ERROR: cannot distribute relations using non-heap access methods -- Test generated columns -- val1 after val2 to test https://github.com/citusdata/citus/issues/3538 create table gen1 ( @@ -302,8 +292,8 @@ HINT: To remove the local data, run: SELECT truncate_local_data_after_distribut -- should still fail because of fkey INSERT INTO collection_users VALUES (1, 1000, 1); -ERROR: insert or update on table "collection_users_60028" violates foreign key constraint "collection_users_fkey_60028" -DETAIL: Key (key, collection_id)=(1, 1000) is not present in table "collections_list_60016". +ERROR: insert or update on table "collection_users_xxxxxxx" violates foreign key constraint "collection_users_fkey_xxxxxxx" +DETAIL: Key (key, collection_id)=(1, 1000) is not present in table "collections_list_xxxxxxx". CONTEXT: while executing command on localhost:xxxxx -- whereas new record with partition should go through INSERT INTO collections_list VALUES (2, 1, '1.2'); @@ -423,6 +413,6 @@ where val = 'asdf'; \set VERBOSITY terse drop schema test_pg12 cascade; -NOTICE: drop cascades to 13 other objects +NOTICE: drop cascades to 10 other objects \set VERBOSITY default SET citus.shard_replication_factor to 2; diff --git a/src/test/regress/expected/tableam.out b/src/test/regress/expected/tableam.out new file mode 100644 index 000000000..333380503 --- /dev/null +++ b/src/test/regress/expected/tableam.out @@ -0,0 +1,323 @@ +SHOW server_version \gset +SELECT substring(:'server_version', '\d+')::int > 11 AS server_version_above_eleven +\gset +\if :server_version_above_eleven +\else +\q +\endif +SET citus.shard_replication_factor to 1; +SET citus.next_shard_id TO 60000; +SET citus.next_placement_id TO 60000; +SET citus.shard_count TO 4; +create schema test_tableam; +set search_path to test_tableam; +SELECT public.run_command_on_coordinator_and_workers($Q$ + CREATE FUNCTION fake_am_handler(internal) + RETURNS table_am_handler + AS 'citus' + LANGUAGE C; + CREATE ACCESS METHOD fake_am TYPE TABLE HANDLER fake_am_handler; +$Q$); + run_command_on_coordinator_and_workers +--------------------------------------------------------------------- + +(1 row) + +-- +-- Hash distributed table using a non-default table access method +-- +create table test_hash_dist(id int, val int) using fake_am; +insert into test_hash_dist values (1, 1); +WARNING: fake_tuple_insert +select create_distributed_table('test_hash_dist','id'); +WARNING: fake_scan_getnextslot +CONTEXT: SQL statement "SELECT EXISTS (SELECT 1 FROM test_tableam.test_hash_dist)" +WARNING: fake_scan_getnextslot +NOTICE: Copying data from local table... +WARNING: fake_scan_getnextslot +NOTICE: copying the data has completed +DETAIL: The local data in the table is no longer visible, but is still on disk. +HINT: To remove the local data, run: SELECT truncate_local_data_after_distributing_table($$test_tableam.test_hash_dist$$) + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +select * from test_hash_dist; +WARNING: fake_scan_getnextslot +DETAIL: from localhost:xxxxx +WARNING: fake_scan_getnextslot +DETAIL: from localhost:xxxxx +WARNING: fake_scan_getnextslot +DETAIL: from localhost:xxxxx +WARNING: fake_scan_getnextslot +DETAIL: from localhost:xxxxx +WARNING: fake_scan_getnextslot +DETAIL: from localhost:xxxxx + id | val +--------------------------------------------------------------------- + 1 | 1 +(1 row) + +insert into test_hash_dist values (1, 1); +WARNING: fake_tuple_insert +DETAIL: from localhost:xxxxx +-- we should error on following, since this AM is append only +SET client_min_messages TO ERROR; +delete from test_hash_dist where id=1; +ERROR: fake_tuple_delete not implemented +CONTEXT: while executing command on localhost:xxxxx +update test_hash_dist set val=2 where id=2; +RESET client_min_messages; +-- ddl events should include "USING fake_am" +SELECT * FROM master_get_table_ddl_events('test_hash_dist'); + master_get_table_ddl_events +--------------------------------------------------------------------- + CREATE TABLE test_tableam.test_hash_dist (id integer, val integer) USING fake_am + ALTER TABLE test_tableam.test_hash_dist OWNER TO postgres +(2 rows) + +-- +-- Reference table using a non-default table access method +-- +create table test_ref(a int) using fake_am; +insert into test_ref values (1); +WARNING: fake_tuple_insert +select create_reference_table('test_ref'); +WARNING: fake_scan_getnextslot +CONTEXT: SQL statement "SELECT EXISTS (SELECT 1 FROM test_tableam.test_ref)" +WARNING: fake_scan_getnextslot +NOTICE: Copying data from local table... +WARNING: fake_scan_getnextslot +NOTICE: copying the data has completed +DETAIL: The local data in the table is no longer visible, but is still on disk. +HINT: To remove the local data, run: SELECT truncate_local_data_after_distributing_table($$test_tableam.test_ref$$) + create_reference_table +--------------------------------------------------------------------- + +(1 row) + +select * from test_ref; +WARNING: fake_scan_getnextslot +DETAIL: from localhost:xxxxx +WARNING: fake_scan_getnextslot +DETAIL: from localhost:xxxxx + a +--------------------------------------------------------------------- + 1 +(1 row) + +insert into test_ref values (1); +WARNING: fake_tuple_insert +DETAIL: from localhost:xxxxx +WARNING: fake_tuple_insert +DETAIL: from localhost:xxxxx +-- we should error on following, since this AM is append only +SET client_min_messages TO ERROR; +delete from test_ref; +ERROR: fake_tuple_delete not implemented +CONTEXT: while executing command on localhost:xxxxx +update test_ref set a=2; +ERROR: fake_tuple_update not implemented +CONTEXT: while executing command on localhost:xxxxx +RESET client_min_messages; +-- ddl events should include "USING fake_am" +SELECT * FROM master_get_table_ddl_events('test_ref'); + master_get_table_ddl_events +--------------------------------------------------------------------- + CREATE TABLE test_tableam.test_ref (a integer) USING fake_am + ALTER TABLE test_tableam.test_ref OWNER TO postgres +(2 rows) + +-- replicate to coordinator +SET client_min_messages TO WARNING; +SELECT 1 FROM master_add_node('localhost', :master_port, groupid => 0); + ?column? +--------------------------------------------------------------------- + 1 +(1 row) + +RESET client_min_messages; +delete from test_ref; +WARNING: fake_scan_getnextslot +ERROR: fake_tuple_delete not implemented +SELECT master_remove_node('localhost', :master_port); + master_remove_node +--------------------------------------------------------------------- + +(1 row) + +-- +-- Range partitioned table using a non-default table access method +-- +CREATE TABLE test_range_dist(id int, val int) using fake_am; +SELECT create_distributed_table('test_range_dist', 'id', 'range'); +WARNING: fake_scan_getnextslot +CONTEXT: SQL statement "SELECT EXISTS (SELECT 1 FROM test_tableam.test_range_dist)" +WARNING: fake_scan_getnextslot +CONTEXT: SQL statement "SELECT EXISTS (SELECT 1 FROM test_tableam.test_range_dist)" + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +CALL public.create_range_partitioned_shards('test_range_dist', '{"0","25"}','{"24","49"}'); +select * from test_range_dist; +WARNING: fake_scan_getnextslot +DETAIL: from localhost:xxxxx +WARNING: fake_scan_getnextslot +DETAIL: from localhost:xxxxx + id | val +--------------------------------------------------------------------- +(0 rows) + +insert into test_range_dist values (1, 1); +WARNING: fake_tuple_insert +DETAIL: from localhost:xxxxx +COPY test_range_dist FROM PROGRAM 'echo 0, 0 && echo 1, -1 && echo 2, 4 && echo 3, 9' WITH CSV; +COPY test_range_dist FROM PROGRAM 'echo 25, 16 && echo 26, 1 && echo 27, 4 && echo 7, 9' WITH CSV; +-- ddl events should include "USING fake_am" +SELECT * FROM master_get_table_ddl_events('test_range_dist'); + master_get_table_ddl_events +--------------------------------------------------------------------- + CREATE TABLE test_tableam.test_range_dist (id integer, val integer) USING fake_am + ALTER TABLE test_tableam.test_range_dist OWNER TO postgres +(2 rows) + +-- +-- Test master_copy_shard_placement with a fake_am table +-- +select a.shardid, a.nodeport +FROM pg_dist_shard b, pg_dist_shard_placement a +WHERE a.shardid=b.shardid AND logicalrelid = 'test_range_dist'::regclass::oid +ORDER BY a.shardid, nodeport; + shardid | nodeport +--------------------------------------------------------------------- + 60005 | 57637 + 60006 | 57638 +(2 rows) + +SELECT master_copy_shard_placement( + get_shard_id_for_distribution_column('test_range_dist', '1'), + 'localhost', :worker_1_port, + 'localhost', :worker_2_port, + do_repair := false); + master_copy_shard_placement +--------------------------------------------------------------------- + +(1 row) + +select a.shardid, a.nodeport +FROM pg_dist_shard b, pg_dist_shard_placement a +WHERE a.shardid=b.shardid AND logicalrelid = 'test_range_dist'::regclass::oid +ORDER BY a.shardid, nodeport; + shardid | nodeport +--------------------------------------------------------------------- + 60005 | 57637 + 60005 | 57638 + 60006 | 57638 +(3 rows) + +-- verify that data was copied correctly +\c - - - :worker_1_port +select * from test_tableam.test_range_dist_60005 ORDER BY id; +WARNING: fake_scan_getnextslot +WARNING: fake_scan_getnextslot +WARNING: fake_scan_getnextslot +WARNING: fake_scan_getnextslot +WARNING: fake_scan_getnextslot +WARNING: fake_scan_getnextslot +WARNING: fake_scan_getnextslot + id | val +--------------------------------------------------------------------- + 0 | 0 + 1 | 1 + 1 | -1 + 2 | 4 + 3 | 9 + 7 | 9 +(6 rows) + +\c - - - :worker_2_port +select * from test_tableam.test_range_dist_60005 ORDER BY id; +WARNING: fake_scan_getnextslot +WARNING: fake_scan_getnextslot +WARNING: fake_scan_getnextslot +WARNING: fake_scan_getnextslot +WARNING: fake_scan_getnextslot +WARNING: fake_scan_getnextslot +WARNING: fake_scan_getnextslot + id | val +--------------------------------------------------------------------- + 0 | 0 + 1 | 1 + 1 | -1 + 2 | 4 + 3 | 9 + 7 | 9 +(6 rows) + +\c - - - :master_port +-- +-- Test that partitioned tables work correctly with a fake_am table +-- +-- parent using default am, one of children using fake_am +CREATE TABLE test_partitioned(id int, p int, val int) +PARTITION BY RANGE (p); +CREATE TABLE test_partitioned_p1 PARTITION OF test_partitioned + FOR VALUES FROM (1) TO (10); +CREATE TABLE test_partitioned_p2 PARTITION OF test_partitioned + FOR VALUES FROM (11) TO (20) USING fake_am; +INSERT INTO test_partitioned VALUES (1, 5, -1), (2, 15, -2); +WARNING: fake_tuple_insert +SELECT create_distributed_table('test_partitioned', 'id'); +NOTICE: Copying data from local table... +NOTICE: copying the data has completed +DETAIL: The local data in the table is no longer visible, but is still on disk. +HINT: To remove the local data, run: SELECT truncate_local_data_after_distributing_table($$public.test_partitioned_p1$$) +WARNING: fake_scan_getnextslot +CONTEXT: SQL statement "SELECT EXISTS (SELECT 1 FROM public.test_partitioned_p2)" +WARNING: fake_scan_getnextslot +NOTICE: Copying data from local table... +WARNING: fake_scan_getnextslot +NOTICE: copying the data has completed +DETAIL: The local data in the table is no longer visible, but is still on disk. +HINT: To remove the local data, run: SELECT truncate_local_data_after_distributing_table($$public.test_partitioned_p2$$) + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +INSERT INTO test_partitioned VALUES (3, 6, -6), (4, 16, -4); +WARNING: fake_tuple_insert +DETAIL: from localhost:xxxxx +WARNING: fake_tuple_insert +DETAIL: from localhost:xxxxx +SELECT count(*) FROM test_partitioned; +WARNING: fake_scan_getnextslot +DETAIL: from localhost:xxxxx +WARNING: fake_scan_getnextslot +DETAIL: from localhost:xxxxx +WARNING: fake_scan_getnextslot +DETAIL: from localhost:xxxxx +WARNING: fake_scan_getnextslot +DETAIL: from localhost:xxxxx +WARNING: fake_scan_getnextslot +DETAIL: from localhost:xxxxx +WARNING: fake_scan_getnextslot +DETAIL: from localhost:xxxxx + count +--------------------------------------------------------------------- + 4 +(1 row) + +DROP TABLE test_partitioned; +-- Specifying access method in parent is not supported. +-- If the below statement ever succeeds, add more tests for +-- the case where children inherit access method from parent. +CREATE TABLE test_partitioned(id int, p int, val int) +PARTITION BY RANGE (p) USING fake_am; +ERROR: specifying a table access method is not supported on a partitioned table +\set VERBOSITY terse +drop schema test_tableam cascade; +NOTICE: drop cascades to 6 other objects diff --git a/src/test/regress/expected/tableam_0.out b/src/test/regress/expected/tableam_0.out new file mode 100644 index 000000000..9d92533f9 --- /dev/null +++ b/src/test/regress/expected/tableam_0.out @@ -0,0 +1,6 @@ +SHOW server_version \gset +SELECT substring(:'server_version', '\d+')::int > 11 AS server_version_above_eleven +\gset +\if :server_version_above_eleven +\else +\q diff --git a/src/test/regress/multi_schedule b/src/test/regress/multi_schedule index a20731e03..75aa46b3f 100644 --- a/src/test/regress/multi_schedule +++ b/src/test/regress/multi_schedule @@ -82,6 +82,7 @@ test: set_operation_and_local_tables test: subqueries_deep subquery_view subquery_partitioning subquery_complex_target_list subqueries_not_supported subquery_in_where test: non_colocated_leaf_subquery_joins non_colocated_subquery_joins non_colocated_join_order test: subquery_prepared_statements pg12 cte_inline pg13 +test: tableam # ---------- # Miscellaneous tests to check our query planning behavior diff --git a/src/test/regress/sql/pg12.sql b/src/test/regress/sql/pg12.sql index 95f093563..7b3c0be6f 100644 --- a/src/test/regress/sql/pg12.sql +++ b/src/test/regress/sql/pg12.sql @@ -13,17 +13,6 @@ SET citus.next_placement_id TO 60000; create schema test_pg12; set search_path to test_pg12; -CREATE FUNCTION blackhole_am_handler(internal) -RETURNS table_am_handler -AS 'citus' -LANGUAGE C; -CREATE ACCESS METHOD blackhole_am TYPE TABLE HANDLER blackhole_am_handler; - -create table test_am(id int, val int) using blackhole_am; -insert into test_am values (1, 1); --- Custom table access methods should be rejected -select create_distributed_table('test_am','id'); - -- Test generated columns -- val1 after val2 to test https://github.com/citusdata/citus/issues/3538 create table gen1 ( diff --git a/src/test/regress/sql/tableam.sql b/src/test/regress/sql/tableam.sql new file mode 100644 index 000000000..68b127300 --- /dev/null +++ b/src/test/regress/sql/tableam.sql @@ -0,0 +1,149 @@ +SHOW server_version \gset +SELECT substring(:'server_version', '\d+')::int > 11 AS server_version_above_eleven +\gset +\if :server_version_above_eleven +\else +\q +\endif + +SET citus.shard_replication_factor to 1; +SET citus.next_shard_id TO 60000; +SET citus.next_placement_id TO 60000; +SET citus.shard_count TO 4; + +create schema test_tableam; +set search_path to test_tableam; + +SELECT public.run_command_on_coordinator_and_workers($Q$ + CREATE FUNCTION fake_am_handler(internal) + RETURNS table_am_handler + AS 'citus' + LANGUAGE C; + CREATE ACCESS METHOD fake_am TYPE TABLE HANDLER fake_am_handler; +$Q$); + +-- +-- Hash distributed table using a non-default table access method +-- + +create table test_hash_dist(id int, val int) using fake_am; +insert into test_hash_dist values (1, 1); +select create_distributed_table('test_hash_dist','id'); + +select * from test_hash_dist; +insert into test_hash_dist values (1, 1); + +-- we should error on following, since this AM is append only +SET client_min_messages TO ERROR; +delete from test_hash_dist where id=1; +update test_hash_dist set val=2 where id=2; +RESET client_min_messages; + +-- ddl events should include "USING fake_am" +SELECT * FROM master_get_table_ddl_events('test_hash_dist'); + +-- +-- Reference table using a non-default table access method +-- + +create table test_ref(a int) using fake_am; +insert into test_ref values (1); +select create_reference_table('test_ref'); + +select * from test_ref; +insert into test_ref values (1); + +-- we should error on following, since this AM is append only +SET client_min_messages TO ERROR; +delete from test_ref; +update test_ref set a=2; +RESET client_min_messages; + +-- ddl events should include "USING fake_am" +SELECT * FROM master_get_table_ddl_events('test_ref'); + +-- replicate to coordinator +SET client_min_messages TO WARNING; +SELECT 1 FROM master_add_node('localhost', :master_port, groupid => 0); +RESET client_min_messages; +delete from test_ref; +SELECT master_remove_node('localhost', :master_port); + +-- +-- Range partitioned table using a non-default table access method +-- + +CREATE TABLE test_range_dist(id int, val int) using fake_am; +SELECT create_distributed_table('test_range_dist', 'id', 'range'); +CALL public.create_range_partitioned_shards('test_range_dist', '{"0","25"}','{"24","49"}'); + +select * from test_range_dist; +insert into test_range_dist values (1, 1); +COPY test_range_dist FROM PROGRAM 'echo 0, 0 && echo 1, -1 && echo 2, 4 && echo 3, 9' WITH CSV; +COPY test_range_dist FROM PROGRAM 'echo 25, 16 && echo 26, 1 && echo 27, 4 && echo 7, 9' WITH CSV; + + +-- ddl events should include "USING fake_am" +SELECT * FROM master_get_table_ddl_events('test_range_dist'); + +-- +-- Test master_copy_shard_placement with a fake_am table +-- + +select a.shardid, a.nodeport +FROM pg_dist_shard b, pg_dist_shard_placement a +WHERE a.shardid=b.shardid AND logicalrelid = 'test_range_dist'::regclass::oid +ORDER BY a.shardid, nodeport; + +SELECT master_copy_shard_placement( + get_shard_id_for_distribution_column('test_range_dist', '1'), + 'localhost', :worker_1_port, + 'localhost', :worker_2_port, + do_repair := false); + +select a.shardid, a.nodeport +FROM pg_dist_shard b, pg_dist_shard_placement a +WHERE a.shardid=b.shardid AND logicalrelid = 'test_range_dist'::regclass::oid +ORDER BY a.shardid, nodeport; + +-- verify that data was copied correctly + +\c - - - :worker_1_port +select * from test_tableam.test_range_dist_60005 ORDER BY id; + +\c - - - :worker_2_port +select * from test_tableam.test_range_dist_60005 ORDER BY id; + +\c - - - :master_port + +-- +-- Test that partitioned tables work correctly with a fake_am table +-- + +-- parent using default am, one of children using fake_am +CREATE TABLE test_partitioned(id int, p int, val int) +PARTITION BY RANGE (p); + +CREATE TABLE test_partitioned_p1 PARTITION OF test_partitioned + FOR VALUES FROM (1) TO (10); +CREATE TABLE test_partitioned_p2 PARTITION OF test_partitioned + FOR VALUES FROM (11) TO (20) USING fake_am; + +INSERT INTO test_partitioned VALUES (1, 5, -1), (2, 15, -2); + +SELECT create_distributed_table('test_partitioned', 'id'); + +INSERT INTO test_partitioned VALUES (3, 6, -6), (4, 16, -4); + +SELECT count(*) FROM test_partitioned; + +DROP TABLE test_partitioned; + +-- Specifying access method in parent is not supported. +-- If the below statement ever succeeds, add more tests for +-- the case where children inherit access method from parent. +CREATE TABLE test_partitioned(id int, p int, val int) +PARTITION BY RANGE (p) USING fake_am; + +\set VERBOSITY terse +drop schema test_tableam cascade;