mirror of https://github.com/citusdata/citus.git
Table access method support for distributed tables
parent
7cb07c70fa
commit
caabbf4b84
|
@ -119,7 +119,6 @@ static List * TupleDescColumnNameList(TupleDesc tupleDescriptor);
|
||||||
static bool RelationUsesIdentityColumns(TupleDesc relationDesc);
|
static bool RelationUsesIdentityColumns(TupleDesc relationDesc);
|
||||||
static bool DistributionColumnUsesGeneratedStoredColumn(TupleDesc relationDesc,
|
static bool DistributionColumnUsesGeneratedStoredColumn(TupleDesc relationDesc,
|
||||||
Var *distributionColumn);
|
Var *distributionColumn);
|
||||||
static bool RelationUsesHeapAccessMethodOrNone(Relation relation);
|
|
||||||
static bool CanUseExclusiveConnections(Oid relationId, bool localTableEmpty);
|
static bool CanUseExclusiveConnections(Oid relationId, bool localTableEmpty);
|
||||||
static void DoCopyFromLocalTableIntoShards(Relation distributedRelation,
|
static void DoCopyFromLocalTableIntoShards(Relation distributedRelation,
|
||||||
DestReceiver *copyDest,
|
DestReceiver *copyDest,
|
||||||
|
@ -156,16 +155,6 @@ master_create_distributed_table(PG_FUNCTION_ARGS)
|
||||||
|
|
||||||
char *colocateWithTableName = NULL;
|
char *colocateWithTableName = NULL;
|
||||||
bool viaDeprecatedAPI = true;
|
bool viaDeprecatedAPI = true;
|
||||||
ObjectAddress tableAddress = { 0 };
|
|
||||||
|
|
||||||
/*
|
|
||||||
* distributed tables might have dependencies on different objects, since we create
|
|
||||||
* shards for a distributed table via multiple sessions these objects will be created
|
|
||||||
* via their own connection and committed immediately so they become visible to all
|
|
||||||
* sessions creating shards.
|
|
||||||
*/
|
|
||||||
ObjectAddressSet(tableAddress, RelationRelationId, relationId);
|
|
||||||
EnsureDependenciesExistOnAllNodes(&tableAddress);
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Lock target relation with an exclusive lock - there's no way to make
|
* Lock target relation with an exclusive lock - there's no way to make
|
||||||
|
@ -203,8 +192,6 @@ master_create_distributed_table(PG_FUNCTION_ARGS)
|
||||||
Datum
|
Datum
|
||||||
create_distributed_table(PG_FUNCTION_ARGS)
|
create_distributed_table(PG_FUNCTION_ARGS)
|
||||||
{
|
{
|
||||||
ObjectAddress tableAddress = { 0 };
|
|
||||||
|
|
||||||
bool viaDeprecatedAPI = false;
|
bool viaDeprecatedAPI = false;
|
||||||
|
|
||||||
Oid relationId = PG_GETARG_OID(0);
|
Oid relationId = PG_GETARG_OID(0);
|
||||||
|
@ -216,15 +203,6 @@ create_distributed_table(PG_FUNCTION_ARGS)
|
||||||
|
|
||||||
EnsureCitusTableCanBeCreated(relationId);
|
EnsureCitusTableCanBeCreated(relationId);
|
||||||
|
|
||||||
/*
|
|
||||||
* distributed tables might have dependencies on different objects, since we create
|
|
||||||
* shards for a distributed table via multiple sessions these objects will be created
|
|
||||||
* via their own connection and committed immediately so they become visible to all
|
|
||||||
* sessions creating shards.
|
|
||||||
*/
|
|
||||||
ObjectAddressSet(tableAddress, RelationRelationId, relationId);
|
|
||||||
EnsureDependenciesExistOnAllNodes(&tableAddress);
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Lock target relation with an exclusive lock - there's no way to make
|
* Lock target relation with an exclusive lock - there's no way to make
|
||||||
* sense of this table until we've committed, and we don't want multiple
|
* sense of this table until we've committed, and we don't want multiple
|
||||||
|
@ -267,7 +245,6 @@ create_reference_table(PG_FUNCTION_ARGS)
|
||||||
|
|
||||||
char *colocateWithTableName = NULL;
|
char *colocateWithTableName = NULL;
|
||||||
Var *distributionColumn = NULL;
|
Var *distributionColumn = NULL;
|
||||||
ObjectAddress tableAddress = { 0 };
|
|
||||||
|
|
||||||
bool viaDeprecatedAPI = false;
|
bool viaDeprecatedAPI = false;
|
||||||
|
|
||||||
|
@ -275,15 +252,6 @@ create_reference_table(PG_FUNCTION_ARGS)
|
||||||
|
|
||||||
EnsureCitusTableCanBeCreated(relationId);
|
EnsureCitusTableCanBeCreated(relationId);
|
||||||
|
|
||||||
/*
|
|
||||||
* distributed tables might have dependencies on different objects, since we create
|
|
||||||
* shards for a distributed table via multiple sessions these objects will be created
|
|
||||||
* via their own connection and committed immediately so they become visible to all
|
|
||||||
* sessions creating shards.
|
|
||||||
*/
|
|
||||||
ObjectAddressSet(tableAddress, RelationRelationId, relationId);
|
|
||||||
EnsureDependenciesExistOnAllNodes(&tableAddress);
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Lock target relation with an exclusive lock - there's no way to make
|
* Lock target relation with an exclusive lock - there's no way to make
|
||||||
* sense of this table until we've committed, and we don't want multiple
|
* sense of this table until we've committed, and we don't want multiple
|
||||||
|
@ -387,6 +355,16 @@ void
|
||||||
CreateDistributedTable(Oid relationId, Var *distributionColumn, char distributionMethod,
|
CreateDistributedTable(Oid relationId, Var *distributionColumn, char distributionMethod,
|
||||||
char *colocateWithTableName, bool viaDeprecatedAPI)
|
char *colocateWithTableName, bool viaDeprecatedAPI)
|
||||||
{
|
{
|
||||||
|
/*
|
||||||
|
* distributed tables might have dependencies on different objects, since we create
|
||||||
|
* shards for a distributed table via multiple sessions these objects will be created
|
||||||
|
* via their own connection and committed immediately so they become visible to all
|
||||||
|
* sessions creating shards.
|
||||||
|
*/
|
||||||
|
ObjectAddress tableAddress = { 0 };
|
||||||
|
ObjectAddressSet(tableAddress, RelationRelationId, relationId);
|
||||||
|
EnsureDependenciesExistOnAllNodes(&tableAddress);
|
||||||
|
|
||||||
char replicationModel = DecideReplicationModel(distributionMethod,
|
char replicationModel = DecideReplicationModel(distributionMethod,
|
||||||
viaDeprecatedAPI);
|
viaDeprecatedAPI);
|
||||||
|
|
||||||
|
@ -704,12 +682,6 @@ EnsureRelationCanBeDistributed(Oid relationId, Var *distributionColumn,
|
||||||
|
|
||||||
ErrorIfTableIsACatalogTable(relation);
|
ErrorIfTableIsACatalogTable(relation);
|
||||||
|
|
||||||
if (!RelationUsesHeapAccessMethodOrNone(relation))
|
|
||||||
{
|
|
||||||
ereport(ERROR, (errmsg(
|
|
||||||
"cannot distribute relations using non-heap access methods")));
|
|
||||||
}
|
|
||||||
|
|
||||||
#if PG_VERSION_NUM < PG_VERSION_12
|
#if PG_VERSION_NUM < PG_VERSION_12
|
||||||
|
|
||||||
/* verify target relation does not use WITH (OIDS) PostgreSQL feature */
|
/* verify target relation does not use WITH (OIDS) PostgreSQL feature */
|
||||||
|
@ -1376,8 +1348,7 @@ CopyLocalDataIntoShards(Oid distributedRelationId)
|
||||||
|
|
||||||
/* get the table columns */
|
/* get the table columns */
|
||||||
TupleDesc tupleDescriptor = RelationGetDescr(distributedRelation);
|
TupleDesc tupleDescriptor = RelationGetDescr(distributedRelation);
|
||||||
TupleTableSlot *slot = MakeSingleTupleTableSlotCompat(tupleDescriptor,
|
TupleTableSlot *slot = CreateTableSlotForRel(distributedRelation);
|
||||||
&TTSOpsHeapTuple);
|
|
||||||
List *columnNameList = TupleDescColumnNameList(tupleDescriptor);
|
List *columnNameList = TupleDescColumnNameList(tupleDescriptor);
|
||||||
|
|
||||||
int partitionColumnIndex = INVALID_PARTITION_COLUMN_INDEX;
|
int partitionColumnIndex = INVALID_PARTITION_COLUMN_INDEX;
|
||||||
|
@ -1441,15 +1412,9 @@ DoCopyFromLocalTableIntoShards(Relation distributedRelation,
|
||||||
MemoryContext oldContext = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
|
MemoryContext oldContext = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
|
||||||
|
|
||||||
uint64 rowsCopied = 0;
|
uint64 rowsCopied = 0;
|
||||||
HeapTuple tuple = NULL;
|
while (table_scan_getnextslot(scan, ForwardScanDirection, slot))
|
||||||
while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
|
|
||||||
{
|
{
|
||||||
/* materialize tuple and send it to a shard */
|
/* send tuple it to a shard */
|
||||||
#if PG_VERSION_NUM >= PG_VERSION_12
|
|
||||||
ExecStoreHeapTuple(tuple, slot, false);
|
|
||||||
#else
|
|
||||||
ExecStoreTuple(tuple, slot, InvalidBuffer, false);
|
|
||||||
#endif
|
|
||||||
copyDest->receiveSlot(slot, copyDest);
|
copyDest->receiveSlot(slot, copyDest);
|
||||||
|
|
||||||
/* clear tuple memory */
|
/* clear tuple memory */
|
||||||
|
@ -1572,22 +1537,6 @@ DistributionColumnUsesGeneratedStoredColumn(TupleDesc relationDesc,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
|
||||||
* Returns whether given relation uses default access method
|
|
||||||
*/
|
|
||||||
static bool
|
|
||||||
RelationUsesHeapAccessMethodOrNone(Relation relation)
|
|
||||||
{
|
|
||||||
#if PG_VERSION_NUM >= PG_VERSION_12
|
|
||||||
|
|
||||||
return relation->rd_rel->relkind != RELKIND_RELATION ||
|
|
||||||
relation->rd_amhandler == HEAP_TABLE_AM_HANDLER_OID;
|
|
||||||
#else
|
|
||||||
return true;
|
|
||||||
#endif
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* UndistributeTable undistributes the given table. The undistribution is done by
|
* UndistributeTable undistributes the given table. The undistribution is done by
|
||||||
* creating a new table, moving everything to the new table and dropping the old one.
|
* creating a new table, moving everything to the new table and dropping the old one.
|
||||||
|
|
|
@ -26,6 +26,7 @@
|
||||||
#include "catalog/dependency.h"
|
#include "catalog/dependency.h"
|
||||||
#include "catalog/indexing.h"
|
#include "catalog/indexing.h"
|
||||||
#include "catalog/namespace.h"
|
#include "catalog/namespace.h"
|
||||||
|
#include "catalog/pg_am.h"
|
||||||
#include "catalog/pg_attribute.h"
|
#include "catalog/pg_attribute.h"
|
||||||
#include "catalog/pg_authid.h"
|
#include "catalog/pg_authid.h"
|
||||||
#include "catalog/pg_class.h"
|
#include "catalog/pg_class.h"
|
||||||
|
@ -451,6 +452,27 @@ pg_get_tableschemadef_string(Oid tableRelationId, bool includeSequenceDefaults)
|
||||||
appendStringInfo(&buffer, " PARTITION BY %s ", partitioningInformation);
|
appendStringInfo(&buffer, " PARTITION BY %s ", partitioningInformation);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#if PG_VERSION_NUM >= 120000
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Add table access methods for pg12 and higher when the table is configured with an
|
||||||
|
* access method
|
||||||
|
*/
|
||||||
|
if (OidIsValid(relation->rd_rel->relam))
|
||||||
|
{
|
||||||
|
HeapTuple amTup = SearchSysCache1(AMOID, ObjectIdGetDatum(
|
||||||
|
relation->rd_rel->relam));
|
||||||
|
if (!HeapTupleIsValid(amTup))
|
||||||
|
{
|
||||||
|
elog(ERROR, "cache lookup failed for access method %u",
|
||||||
|
relation->rd_rel->relam);
|
||||||
|
}
|
||||||
|
Form_pg_am amForm = (Form_pg_am) GETSTRUCT(amTup);
|
||||||
|
appendStringInfo(&buffer, " USING %s", quote_identifier(NameStr(amForm->amname)));
|
||||||
|
ReleaseSysCache(amTup);
|
||||||
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Add any reloptions (storage parameters) defined on the table in a WITH
|
* Add any reloptions (storage parameters) defined on the table in a WITH
|
||||||
* clause.
|
* clause.
|
||||||
|
|
|
@ -546,6 +546,19 @@ SupportedDependencyByCitus(const ObjectAddress *address)
|
||||||
*/
|
*/
|
||||||
switch (getObjectClass(address))
|
switch (getObjectClass(address))
|
||||||
{
|
{
|
||||||
|
#if PG_VERSION_NUM >= 120000
|
||||||
|
case OCLASS_AM:
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
* Only support access methods if they came from extensions
|
||||||
|
* During the dependency resolution it will cascade into the extension and
|
||||||
|
* distributed that one instead of the Access Method. Now access methods can
|
||||||
|
* be configured on tables on the workers.
|
||||||
|
*/
|
||||||
|
return IsObjectAddressOwnedByExtension(address, NULL);
|
||||||
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
case OCLASS_COLLATION:
|
case OCLASS_COLLATION:
|
||||||
case OCLASS_SCHEMA:
|
case OCLASS_SCHEMA:
|
||||||
{
|
{
|
||||||
|
|
|
@ -1,504 +0,0 @@
|
||||||
/*-------------------------------------------------------------------------
|
|
||||||
*
|
|
||||||
* blackhole_am.c
|
|
||||||
* blackhole table access method code
|
|
||||||
*
|
|
||||||
* Portions Copyright (c) 1996-PostgreSQL Global Development Group
|
|
||||||
* Portions Copyright (c) 1994, Regents of the University of California
|
|
||||||
*
|
|
||||||
*
|
|
||||||
* IDENTIFICATION
|
|
||||||
* Copied from https://github.com/michaelpq/pg_plugins/blob/master/blackhole_am/blackhole_am.c
|
|
||||||
*
|
|
||||||
*
|
|
||||||
* NOTES
|
|
||||||
* This file introduces the table access method blackhole, which can
|
|
||||||
* be used as a template for other table access methods, and guarantees
|
|
||||||
* that any data inserted into it gets sent to the void.
|
|
||||||
*
|
|
||||||
*-------------------------------------------------------------------------
|
|
||||||
*/
|
|
||||||
|
|
||||||
/* *INDENT-OFF* */
|
|
||||||
#include "postgres.h"
|
|
||||||
|
|
||||||
#include "distributed/pg_version_constants.h"
|
|
||||||
|
|
||||||
#if PG_VERSION_NUM >= PG_VERSION_12
|
|
||||||
|
|
||||||
#include "access/tableam.h"
|
|
||||||
#include "access/heapam.h"
|
|
||||||
#include "access/amapi.h"
|
|
||||||
#include "catalog/index.h"
|
|
||||||
#include "commands/vacuum.h"
|
|
||||||
#include "executor/tuptable.h"
|
|
||||||
|
|
||||||
PG_FUNCTION_INFO_V1(blackhole_am_handler);
|
|
||||||
|
|
||||||
/* Base structures for scans */
|
|
||||||
typedef struct BlackholeScanDescData
|
|
||||||
{
|
|
||||||
TableScanDescData rs_base; /* AM independent part of the descriptor */
|
|
||||||
|
|
||||||
/* Add more fields here as needed by the AM. */
|
|
||||||
} BlackholeScanDescData;
|
|
||||||
typedef struct BlackholeScanDescData *BlackholeScanDesc;
|
|
||||||
|
|
||||||
static const TableAmRoutine blackhole_methods;
|
|
||||||
|
|
||||||
|
|
||||||
/* ------------------------------------------------------------------------
|
|
||||||
* Slot related callbacks for blackhole AM
|
|
||||||
* ------------------------------------------------------------------------
|
|
||||||
*/
|
|
||||||
|
|
||||||
static const TupleTableSlotOps *
|
|
||||||
blackhole_slot_callbacks(Relation relation)
|
|
||||||
{
|
|
||||||
/*
|
|
||||||
* Here you would most likely want to invent your own set of
|
|
||||||
* slot callbacks for your AM.
|
|
||||||
*/
|
|
||||||
return &TTSOpsMinimalTuple;
|
|
||||||
}
|
|
||||||
|
|
||||||
/* ------------------------------------------------------------------------
|
|
||||||
* Table Scan Callbacks for blackhole AM
|
|
||||||
* ------------------------------------------------------------------------
|
|
||||||
*/
|
|
||||||
|
|
||||||
static TableScanDesc
|
|
||||||
blackhole_scan_begin(Relation relation, Snapshot snapshot,
|
|
||||||
int nkeys, ScanKey key,
|
|
||||||
ParallelTableScanDesc parallel_scan,
|
|
||||||
uint32 flags)
|
|
||||||
{
|
|
||||||
BlackholeScanDesc scan;
|
|
||||||
|
|
||||||
scan = (BlackholeScanDesc) palloc(sizeof(BlackholeScanDescData));
|
|
||||||
|
|
||||||
scan->rs_base.rs_rd = relation;
|
|
||||||
scan->rs_base.rs_snapshot = snapshot;
|
|
||||||
scan->rs_base.rs_nkeys = nkeys;
|
|
||||||
scan->rs_base.rs_flags = flags;
|
|
||||||
scan->rs_base.rs_parallel = parallel_scan;
|
|
||||||
|
|
||||||
return (TableScanDesc) scan; }
|
|
||||||
|
|
||||||
static void
|
|
||||||
blackhole_scan_end(TableScanDesc sscan)
|
|
||||||
{
|
|
||||||
BlackholeScanDesc scan = (BlackholeScanDesc) sscan;
|
|
||||||
pfree(scan);
|
|
||||||
}
|
|
||||||
|
|
||||||
static void
|
|
||||||
blackhole_scan_rescan(TableScanDesc sscan, ScanKey key, bool set_params,
|
|
||||||
bool allow_strat, bool allow_sync, bool allow_pagemode)
|
|
||||||
{
|
|
||||||
/* nothing to do */
|
|
||||||
}
|
|
||||||
|
|
||||||
static bool
|
|
||||||
blackhole_scan_getnextslot(TableScanDesc sscan, ScanDirection direction,
|
|
||||||
TupleTableSlot *slot)
|
|
||||||
{
|
|
||||||
/* nothing to do */
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
/* ------------------------------------------------------------------------
|
|
||||||
* Index Scan Callbacks for blackhole AM
|
|
||||||
* ------------------------------------------------------------------------
|
|
||||||
*/
|
|
||||||
|
|
||||||
static IndexFetchTableData *
|
|
||||||
blackhole_index_fetch_begin(Relation rel)
|
|
||||||
{
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
static void
|
|
||||||
blackhole_index_fetch_reset(IndexFetchTableData *scan)
|
|
||||||
{
|
|
||||||
/* nothing to do here */
|
|
||||||
}
|
|
||||||
|
|
||||||
static void
|
|
||||||
blackhole_index_fetch_end(IndexFetchTableData *scan)
|
|
||||||
{
|
|
||||||
/* nothing to do here */
|
|
||||||
}
|
|
||||||
|
|
||||||
static bool
|
|
||||||
blackhole_index_fetch_tuple(struct IndexFetchTableData *scan,
|
|
||||||
ItemPointer tid,
|
|
||||||
Snapshot snapshot,
|
|
||||||
TupleTableSlot *slot,
|
|
||||||
bool *call_again, bool *all_dead)
|
|
||||||
{
|
|
||||||
/* there is no data */
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/* ------------------------------------------------------------------------
|
|
||||||
* Callbacks for non-modifying operations on individual tuples for
|
|
||||||
* blackhole AM.
|
|
||||||
* ------------------------------------------------------------------------
|
|
||||||
*/
|
|
||||||
|
|
||||||
static bool
|
|
||||||
blackhole_fetch_row_version(Relation relation,
|
|
||||||
ItemPointer tid,
|
|
||||||
Snapshot snapshot,
|
|
||||||
TupleTableSlot *slot)
|
|
||||||
{
|
|
||||||
/* nothing to do */
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
static void
|
|
||||||
blackhole_get_latest_tid(TableScanDesc sscan,
|
|
||||||
ItemPointer tid)
|
|
||||||
{
|
|
||||||
/* nothing to do */
|
|
||||||
}
|
|
||||||
|
|
||||||
static bool
|
|
||||||
blackhole_tuple_tid_valid(TableScanDesc scan, ItemPointer tid)
|
|
||||||
{
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
static bool
|
|
||||||
blackhole_tuple_satisfies_snapshot(Relation rel, TupleTableSlot *slot,
|
|
||||||
Snapshot snapshot)
|
|
||||||
{
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
static TransactionId
|
|
||||||
blackhole_compute_xid_horizon_for_tuples(Relation rel,
|
|
||||||
ItemPointerData *tids,
|
|
||||||
int nitems)
|
|
||||||
{
|
|
||||||
return InvalidTransactionId;
|
|
||||||
}
|
|
||||||
|
|
||||||
/* ----------------------------------------------------------------------------
|
|
||||||
* Functions for manipulations of physical tuples for blackhole AM.
|
|
||||||
* ----------------------------------------------------------------------------
|
|
||||||
*/
|
|
||||||
|
|
||||||
static void
|
|
||||||
blackhole_tuple_insert(Relation relation, TupleTableSlot *slot,
|
|
||||||
CommandId cid, int options, BulkInsertState bistate)
|
|
||||||
{
|
|
||||||
/* nothing to do */
|
|
||||||
}
|
|
||||||
|
|
||||||
static void
|
|
||||||
blackhole_tuple_insert_speculative(Relation relation, TupleTableSlot *slot,
|
|
||||||
CommandId cid, int options,
|
|
||||||
BulkInsertState bistate,
|
|
||||||
uint32 specToken)
|
|
||||||
{
|
|
||||||
/* nothing to do */
|
|
||||||
}
|
|
||||||
|
|
||||||
static void
|
|
||||||
blackhole_tuple_complete_speculative(Relation relation, TupleTableSlot *slot,
|
|
||||||
uint32 spekToken, bool succeeded)
|
|
||||||
{
|
|
||||||
/* nothing to do */
|
|
||||||
}
|
|
||||||
|
|
||||||
static void
|
|
||||||
blackhole_multi_insert(Relation relation, TupleTableSlot **slots,
|
|
||||||
int ntuples, CommandId cid, int options,
|
|
||||||
BulkInsertState bistate)
|
|
||||||
{
|
|
||||||
/* nothing to do */
|
|
||||||
}
|
|
||||||
|
|
||||||
static TM_Result
|
|
||||||
blackhole_tuple_delete(Relation relation, ItemPointer tid, CommandId cid,
|
|
||||||
Snapshot snapshot, Snapshot crosscheck, bool wait,
|
|
||||||
TM_FailureData *tmfd, bool changingPart)
|
|
||||||
{
|
|
||||||
/* nothing to do, so it is always OK */
|
|
||||||
return TM_Ok;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
static TM_Result
|
|
||||||
blackhole_tuple_update(Relation relation, ItemPointer otid,
|
|
||||||
TupleTableSlot *slot, CommandId cid,
|
|
||||||
Snapshot snapshot, Snapshot crosscheck,
|
|
||||||
bool wait, TM_FailureData *tmfd,
|
|
||||||
LockTupleMode *lockmode, bool *update_indexes)
|
|
||||||
{
|
|
||||||
/* nothing to do, so it is always OK */
|
|
||||||
return TM_Ok;
|
|
||||||
}
|
|
||||||
|
|
||||||
static TM_Result
|
|
||||||
blackhole_tuple_lock(Relation relation, ItemPointer tid, Snapshot snapshot,
|
|
||||||
TupleTableSlot *slot, CommandId cid, LockTupleMode mode,
|
|
||||||
LockWaitPolicy wait_policy, uint8 flags,
|
|
||||||
TM_FailureData *tmfd)
|
|
||||||
{
|
|
||||||
/* nothing to do, so it is always OK */
|
|
||||||
return TM_Ok;
|
|
||||||
}
|
|
||||||
|
|
||||||
static void
|
|
||||||
blackhole_finish_bulk_insert(Relation relation, int options)
|
|
||||||
{
|
|
||||||
/* nothing to do */
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/* ------------------------------------------------------------------------
|
|
||||||
* DDL related callbacks for blackhole AM.
|
|
||||||
* ------------------------------------------------------------------------
|
|
||||||
*/
|
|
||||||
|
|
||||||
static void
|
|
||||||
blackhole_relation_set_new_filenode(Relation rel,
|
|
||||||
const RelFileNode *newrnode,
|
|
||||||
char persistence,
|
|
||||||
TransactionId *freezeXid,
|
|
||||||
MultiXactId *minmulti)
|
|
||||||
{
|
|
||||||
/* nothing to do */
|
|
||||||
}
|
|
||||||
|
|
||||||
static void
|
|
||||||
blackhole_relation_nontransactional_truncate(Relation rel)
|
|
||||||
{
|
|
||||||
/* nothing to do */
|
|
||||||
}
|
|
||||||
|
|
||||||
static void
|
|
||||||
blackhole_copy_data(Relation rel, const RelFileNode *newrnode)
|
|
||||||
{
|
|
||||||
/* there is no data */
|
|
||||||
}
|
|
||||||
|
|
||||||
static void
|
|
||||||
blackhole_copy_for_cluster(Relation OldTable, Relation NewTable,
|
|
||||||
Relation OldIndex, bool use_sort,
|
|
||||||
TransactionId OldestXmin,
|
|
||||||
TransactionId *xid_cutoff,
|
|
||||||
MultiXactId *multi_cutoff,
|
|
||||||
double *num_tuples,
|
|
||||||
double *tups_vacuumed,
|
|
||||||
double *tups_recently_dead)
|
|
||||||
{
|
|
||||||
/* no data, so nothing to do */
|
|
||||||
}
|
|
||||||
|
|
||||||
static void
|
|
||||||
blackhole_vacuum(Relation onerel, VacuumParams *params,
|
|
||||||
BufferAccessStrategy bstrategy)
|
|
||||||
{
|
|
||||||
/* no data, so nothing to do */
|
|
||||||
}
|
|
||||||
|
|
||||||
static bool
|
|
||||||
blackhole_scan_analyze_next_block(TableScanDesc scan, BlockNumber blockno,
|
|
||||||
BufferAccessStrategy bstrategy)
|
|
||||||
{
|
|
||||||
/* no data, so no point to analyze next block */
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
static bool
|
|
||||||
blackhole_scan_analyze_next_tuple(TableScanDesc scan, TransactionId OldestXmin,
|
|
||||||
double *liverows, double *deadrows,
|
|
||||||
TupleTableSlot *slot)
|
|
||||||
{
|
|
||||||
/* no data, so no point to analyze next tuple */
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
static double
|
|
||||||
blackhole_index_build_range_scan(Relation tableRelation,
|
|
||||||
Relation indexRelation,
|
|
||||||
IndexInfo *indexInfo,
|
|
||||||
bool allow_sync,
|
|
||||||
bool anyvisible,
|
|
||||||
bool progress,
|
|
||||||
BlockNumber start_blockno,
|
|
||||||
BlockNumber numblocks,
|
|
||||||
IndexBuildCallback callback,
|
|
||||||
void *callback_state,
|
|
||||||
TableScanDesc scan)
|
|
||||||
{
|
|
||||||
/* no data, so no tuples */
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
static void
|
|
||||||
blackhole_index_validate_scan(Relation tableRelation,
|
|
||||||
Relation indexRelation,
|
|
||||||
IndexInfo *indexInfo,
|
|
||||||
Snapshot snapshot,
|
|
||||||
ValidateIndexState *state)
|
|
||||||
{
|
|
||||||
/* nothing to do */
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/* ------------------------------------------------------------------------
|
|
||||||
* Miscellaneous callbacks for the blackhole AM
|
|
||||||
* ------------------------------------------------------------------------
|
|
||||||
*/
|
|
||||||
|
|
||||||
static uint64
|
|
||||||
blackhole_relation_size(Relation rel, ForkNumber forkNumber)
|
|
||||||
{
|
|
||||||
/* there is nothing */
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
/*
|
|
||||||
* Check to see whether the table needs a TOAST table.
|
|
||||||
*/
|
|
||||||
static bool
|
|
||||||
blackhole_relation_needs_toast_table(Relation rel)
|
|
||||||
{
|
|
||||||
/* no data, so no toast table needed */
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/* ------------------------------------------------------------------------
|
|
||||||
* Planner related callbacks for the blackhole AM
|
|
||||||
* ------------------------------------------------------------------------
|
|
||||||
*/
|
|
||||||
|
|
||||||
static void
|
|
||||||
blackhole_estimate_rel_size(Relation rel, int32 *attr_widths,
|
|
||||||
BlockNumber *pages, double *tuples,
|
|
||||||
double *allvisfrac)
|
|
||||||
{
|
|
||||||
/* no data available */
|
|
||||||
*attr_widths = 0;
|
|
||||||
*tuples = 0;
|
|
||||||
*allvisfrac = 0;
|
|
||||||
*pages = 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/* ------------------------------------------------------------------------
|
|
||||||
* Executor related callbacks for the blackhole AM
|
|
||||||
* ------------------------------------------------------------------------
|
|
||||||
*/
|
|
||||||
|
|
||||||
static bool
|
|
||||||
blackhole_scan_bitmap_next_block(TableScanDesc scan,
|
|
||||||
TBMIterateResult *tbmres)
|
|
||||||
{
|
|
||||||
/* no data, so no point to scan next block */
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
static bool
|
|
||||||
blackhole_scan_bitmap_next_tuple(TableScanDesc scan,
|
|
||||||
TBMIterateResult *tbmres,
|
|
||||||
TupleTableSlot *slot)
|
|
||||||
{
|
|
||||||
/* no data, so no point to scan next tuple */
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
static bool
|
|
||||||
blackhole_scan_sample_next_block(TableScanDesc scan,
|
|
||||||
SampleScanState *scanstate)
|
|
||||||
{
|
|
||||||
/* no data, so no point to scan next block for sampling */
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
static bool
|
|
||||||
blackhole_scan_sample_next_tuple(TableScanDesc scan,
|
|
||||||
SampleScanState *scanstate,
|
|
||||||
TupleTableSlot *slot)
|
|
||||||
{
|
|
||||||
/* no data, so no point to scan next tuple for sampling */
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/* ------------------------------------------------------------------------
|
|
||||||
* Definition of the blackhole table access method.
|
|
||||||
* ------------------------------------------------------------------------
|
|
||||||
*/
|
|
||||||
|
|
||||||
static const TableAmRoutine blackhole_methods = {
|
|
||||||
.type = T_TableAmRoutine,
|
|
||||||
|
|
||||||
.slot_callbacks = blackhole_slot_callbacks,
|
|
||||||
|
|
||||||
.scan_begin = blackhole_scan_begin,
|
|
||||||
.scan_end = blackhole_scan_end,
|
|
||||||
.scan_rescan = blackhole_scan_rescan,
|
|
||||||
.scan_getnextslot = blackhole_scan_getnextslot,
|
|
||||||
|
|
||||||
/* these are common helper functions */
|
|
||||||
.parallelscan_estimate = table_block_parallelscan_estimate,
|
|
||||||
.parallelscan_initialize = table_block_parallelscan_initialize,
|
|
||||||
.parallelscan_reinitialize = table_block_parallelscan_reinitialize,
|
|
||||||
|
|
||||||
.index_fetch_begin = blackhole_index_fetch_begin,
|
|
||||||
.index_fetch_reset = blackhole_index_fetch_reset,
|
|
||||||
.index_fetch_end = blackhole_index_fetch_end,
|
|
||||||
.index_fetch_tuple = blackhole_index_fetch_tuple,
|
|
||||||
|
|
||||||
.tuple_insert = blackhole_tuple_insert,
|
|
||||||
.tuple_insert_speculative = blackhole_tuple_insert_speculative,
|
|
||||||
.tuple_complete_speculative = blackhole_tuple_complete_speculative,
|
|
||||||
.multi_insert = blackhole_multi_insert,
|
|
||||||
.tuple_delete = blackhole_tuple_delete,
|
|
||||||
.tuple_update = blackhole_tuple_update,
|
|
||||||
.tuple_lock = blackhole_tuple_lock,
|
|
||||||
.finish_bulk_insert = blackhole_finish_bulk_insert,
|
|
||||||
|
|
||||||
.tuple_fetch_row_version = blackhole_fetch_row_version,
|
|
||||||
.tuple_get_latest_tid = blackhole_get_latest_tid,
|
|
||||||
.tuple_tid_valid = blackhole_tuple_tid_valid,
|
|
||||||
.tuple_satisfies_snapshot = blackhole_tuple_satisfies_snapshot,
|
|
||||||
.compute_xid_horizon_for_tuples = blackhole_compute_xid_horizon_for_tuples,
|
|
||||||
|
|
||||||
.relation_set_new_filenode = blackhole_relation_set_new_filenode,
|
|
||||||
.relation_nontransactional_truncate = blackhole_relation_nontransactional_truncate,
|
|
||||||
.relation_copy_data = blackhole_copy_data,
|
|
||||||
.relation_copy_for_cluster = blackhole_copy_for_cluster,
|
|
||||||
.relation_vacuum = blackhole_vacuum,
|
|
||||||
.scan_analyze_next_block = blackhole_scan_analyze_next_block,
|
|
||||||
.scan_analyze_next_tuple = blackhole_scan_analyze_next_tuple,
|
|
||||||
.index_build_range_scan = blackhole_index_build_range_scan,
|
|
||||||
.index_validate_scan = blackhole_index_validate_scan,
|
|
||||||
|
|
||||||
.relation_size = blackhole_relation_size,
|
|
||||||
.relation_needs_toast_table = blackhole_relation_needs_toast_table,
|
|
||||||
|
|
||||||
.relation_estimate_size = blackhole_estimate_rel_size,
|
|
||||||
|
|
||||||
.scan_bitmap_next_block = blackhole_scan_bitmap_next_block,
|
|
||||||
.scan_bitmap_next_tuple = blackhole_scan_bitmap_next_tuple,
|
|
||||||
.scan_sample_next_block = blackhole_scan_sample_next_block,
|
|
||||||
.scan_sample_next_tuple = blackhole_scan_sample_next_tuple
|
|
||||||
};
|
|
||||||
|
|
||||||
|
|
||||||
Datum
|
|
||||||
blackhole_am_handler(PG_FUNCTION_ARGS)
|
|
||||||
{
|
|
||||||
PG_RETURN_POINTER(&blackhole_methods);
|
|
||||||
}
|
|
||||||
|
|
||||||
#endif
|
|
|
@ -0,0 +1,591 @@
|
||||||
|
/*-------------------------------------------------------------------------
|
||||||
|
*
|
||||||
|
* fake_am.c
|
||||||
|
* fake table access method code
|
||||||
|
*
|
||||||
|
* Copyright (c) Citus Data, Inc.
|
||||||
|
*
|
||||||
|
* IDENTIFICATION
|
||||||
|
* Based on https://github.com/michaelpq/pg_plugins/blob/master/blackhole_am/blackhole_am.c
|
||||||
|
*
|
||||||
|
*
|
||||||
|
* NOTES
|
||||||
|
* This file introduces the table access method "fake", which delegates
|
||||||
|
* bare minimum functionality for testing to heapam to provide an append
|
||||||
|
* only access method, and doesn't implement rest of the functionality.
|
||||||
|
*
|
||||||
|
*-------------------------------------------------------------------------
|
||||||
|
*/
|
||||||
|
|
||||||
|
#include "postgres.h"
|
||||||
|
|
||||||
|
#include "distributed/pg_version_constants.h"
|
||||||
|
|
||||||
|
#if PG_VERSION_NUM >= PG_VERSION_12
|
||||||
|
|
||||||
|
#include "access/amapi.h"
|
||||||
|
#include "access/heapam.h"
|
||||||
|
#include "access/tableam.h"
|
||||||
|
#include "access/multixact.h"
|
||||||
|
#include "access/xact.h"
|
||||||
|
#include "catalog/index.h"
|
||||||
|
#include "catalog/storage.h"
|
||||||
|
#include "catalog/storage_xlog.h"
|
||||||
|
#include "commands/vacuum.h"
|
||||||
|
#include "executor/tuptable.h"
|
||||||
|
#include "storage/smgr.h"
|
||||||
|
#include "utils/snapmgr.h"
|
||||||
|
|
||||||
|
PG_FUNCTION_INFO_V1(fake_am_handler);
|
||||||
|
|
||||||
|
static const TableAmRoutine fake_methods;
|
||||||
|
|
||||||
|
|
||||||
|
/* ------------------------------------------------------------------------
|
||||||
|
* Slot related callbacks for fake AM
|
||||||
|
* ------------------------------------------------------------------------
|
||||||
|
*/
|
||||||
|
static const TupleTableSlotOps *
|
||||||
|
fake_slot_callbacks(Relation relation)
|
||||||
|
{
|
||||||
|
return &TTSOpsBufferHeapTuple;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/* ------------------------------------------------------------------------
|
||||||
|
* Table Scan Callbacks for fake AM
|
||||||
|
* ------------------------------------------------------------------------
|
||||||
|
*/
|
||||||
|
static TableScanDesc
|
||||||
|
fake_scan_begin(Relation relation, Snapshot snapshot,
|
||||||
|
int nkeys, ScanKey key,
|
||||||
|
ParallelTableScanDesc parallel_scan,
|
||||||
|
uint32 flags)
|
||||||
|
{
|
||||||
|
return heap_beginscan(relation, snapshot, nkeys, key, parallel_scan, flags);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
static void
|
||||||
|
fake_scan_end(TableScanDesc sscan)
|
||||||
|
{
|
||||||
|
heap_endscan(sscan);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
static void
|
||||||
|
fake_scan_rescan(TableScanDesc sscan, ScanKey key, bool set_params,
|
||||||
|
bool allow_strat, bool allow_sync, bool allow_pagemode)
|
||||||
|
{
|
||||||
|
heap_rescan(sscan, key, set_params, allow_strat, allow_sync, allow_pagemode);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
static bool
|
||||||
|
fake_scan_getnextslot(TableScanDesc sscan, ScanDirection direction,
|
||||||
|
TupleTableSlot *slot)
|
||||||
|
{
|
||||||
|
ereport(WARNING, (errmsg("fake_scan_getnextslot")));
|
||||||
|
return heap_getnextslot(sscan, direction, slot);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/* ------------------------------------------------------------------------
|
||||||
|
* Index Scan Callbacks for fake AM
|
||||||
|
* ------------------------------------------------------------------------
|
||||||
|
*/
|
||||||
|
static IndexFetchTableData *
|
||||||
|
fake_index_fetch_begin(Relation rel)
|
||||||
|
{
|
||||||
|
elog(ERROR, "fake_index_fetch_begin not implemented");
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
static void
|
||||||
|
fake_index_fetch_reset(IndexFetchTableData *scan)
|
||||||
|
{
|
||||||
|
elog(ERROR, "fake_index_fetch_reset not implemented");
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
static void
|
||||||
|
fake_index_fetch_end(IndexFetchTableData *scan)
|
||||||
|
{
|
||||||
|
elog(ERROR, "fake_index_fetch_end not implemented");
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
static bool
|
||||||
|
fake_index_fetch_tuple(struct IndexFetchTableData *scan,
|
||||||
|
ItemPointer tid,
|
||||||
|
Snapshot snapshot,
|
||||||
|
TupleTableSlot *slot,
|
||||||
|
bool *call_again, bool *all_dead)
|
||||||
|
{
|
||||||
|
elog(ERROR, "fake_index_fetch_tuple not implemented");
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/* ------------------------------------------------------------------------
|
||||||
|
* Callbacks for non-modifying operations on individual tuples for
|
||||||
|
* fake AM.
|
||||||
|
* ------------------------------------------------------------------------
|
||||||
|
*/
|
||||||
|
static bool
|
||||||
|
fake_fetch_row_version(Relation relation,
|
||||||
|
ItemPointer tid,
|
||||||
|
Snapshot snapshot,
|
||||||
|
TupleTableSlot *slot)
|
||||||
|
{
|
||||||
|
elog(ERROR, "fake_fetch_row_version not implemented");
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
static void
|
||||||
|
fake_get_latest_tid(TableScanDesc sscan,
|
||||||
|
ItemPointer tid)
|
||||||
|
{
|
||||||
|
elog(ERROR, "fake_get_latest_tid not implemented");
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
static bool
|
||||||
|
fake_tuple_tid_valid(TableScanDesc scan, ItemPointer tid)
|
||||||
|
{
|
||||||
|
elog(ERROR, "fake_tuple_tid_valid not implemented");
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
static bool
|
||||||
|
fake_tuple_satisfies_snapshot(Relation rel, TupleTableSlot *slot,
|
||||||
|
Snapshot snapshot)
|
||||||
|
{
|
||||||
|
elog(ERROR, "fake_tuple_satisfies_snapshot not implemented");
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
static TransactionId
|
||||||
|
fake_compute_xid_horizon_for_tuples(Relation rel,
|
||||||
|
ItemPointerData *tids,
|
||||||
|
int nitems)
|
||||||
|
{
|
||||||
|
elog(ERROR, "fake_compute_xid_horizon_for_tuples not implemented");
|
||||||
|
return InvalidTransactionId;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/* ----------------------------------------------------------------------------
|
||||||
|
* Functions for manipulations of physical tuples for fake AM.
|
||||||
|
* ----------------------------------------------------------------------------
|
||||||
|
*/
|
||||||
|
static void
|
||||||
|
fake_tuple_insert(Relation relation, TupleTableSlot *slot,
|
||||||
|
CommandId cid, int options, BulkInsertState bistate)
|
||||||
|
{
|
||||||
|
ereport(WARNING, (errmsg("fake_tuple_insert")));
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Code below this point is from heapam_tuple_insert from
|
||||||
|
* heapam_handler.c
|
||||||
|
*/
|
||||||
|
|
||||||
|
bool shouldFree = true;
|
||||||
|
HeapTuple tuple = ExecFetchSlotHeapTuple(slot, true, &shouldFree);
|
||||||
|
|
||||||
|
/* Update the tuple with table oid */
|
||||||
|
slot->tts_tableOid = RelationGetRelid(relation);
|
||||||
|
tuple->t_tableOid = slot->tts_tableOid;
|
||||||
|
|
||||||
|
/* Perform the insertion, and copy the resulting ItemPointer */
|
||||||
|
heap_insert(relation, tuple, cid, options, bistate);
|
||||||
|
ItemPointerCopy(&tuple->t_self, &slot->tts_tid);
|
||||||
|
|
||||||
|
if (shouldFree)
|
||||||
|
{
|
||||||
|
pfree(tuple);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
static void
|
||||||
|
fake_tuple_insert_speculative(Relation relation, TupleTableSlot *slot,
|
||||||
|
CommandId cid, int options,
|
||||||
|
BulkInsertState bistate,
|
||||||
|
uint32 specToken)
|
||||||
|
{
|
||||||
|
elog(ERROR, "fake_tuple_insert_speculative not implemented");
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
static void
|
||||||
|
fake_tuple_complete_speculative(Relation relation, TupleTableSlot *slot,
|
||||||
|
uint32 spekToken, bool succeeded)
|
||||||
|
{
|
||||||
|
elog(ERROR, "fake_tuple_complete_speculative not implemented");
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
static void
|
||||||
|
fake_multi_insert(Relation relation, TupleTableSlot **slots,
|
||||||
|
int ntuples, CommandId cid, int options,
|
||||||
|
BulkInsertState bistate)
|
||||||
|
{
|
||||||
|
ereport(WARNING, (errmsg("fake_multi_insert")));
|
||||||
|
|
||||||
|
heap_multi_insert(relation, slots, ntuples, cid, options, bistate);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
static TM_Result
|
||||||
|
fake_tuple_delete(Relation relation, ItemPointer tid, CommandId cid,
|
||||||
|
Snapshot snapshot, Snapshot crosscheck, bool wait,
|
||||||
|
TM_FailureData *tmfd, bool changingPart)
|
||||||
|
{
|
||||||
|
elog(ERROR, "fake_tuple_delete not implemented");
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
static TM_Result
|
||||||
|
fake_tuple_update(Relation relation, ItemPointer otid,
|
||||||
|
TupleTableSlot *slot, CommandId cid,
|
||||||
|
Snapshot snapshot, Snapshot crosscheck,
|
||||||
|
bool wait, TM_FailureData *tmfd,
|
||||||
|
LockTupleMode *lockmode, bool *update_indexes)
|
||||||
|
{
|
||||||
|
elog(ERROR, "fake_tuple_update not implemented");
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
static TM_Result
|
||||||
|
fake_tuple_lock(Relation relation, ItemPointer tid, Snapshot snapshot,
|
||||||
|
TupleTableSlot *slot, CommandId cid, LockTupleMode mode,
|
||||||
|
LockWaitPolicy wait_policy, uint8 flags,
|
||||||
|
TM_FailureData *tmfd)
|
||||||
|
{
|
||||||
|
elog(ERROR, "fake_tuple_lock not implemented");
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
static void
|
||||||
|
fake_finish_bulk_insert(Relation relation, int options)
|
||||||
|
{
|
||||||
|
/* nothing to do here */
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/* ------------------------------------------------------------------------
|
||||||
|
* DDL related callbacks for fake AM.
|
||||||
|
* ------------------------------------------------------------------------
|
||||||
|
*/
|
||||||
|
static void
|
||||||
|
fake_relation_set_new_filenode(Relation rel,
|
||||||
|
const RelFileNode *newrnode,
|
||||||
|
char persistence,
|
||||||
|
TransactionId *freezeXid,
|
||||||
|
MultiXactId *minmulti)
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
* Code below is copied from heapam_relation_set_new_filenode in
|
||||||
|
* heapam_handler.c.
|
||||||
|
*/
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Initialize to the minimum XID that could put tuples in the table. We
|
||||||
|
* know that no xacts older than RecentXmin are still running, so that
|
||||||
|
* will do.
|
||||||
|
*/
|
||||||
|
*freezeXid = RecentXmin;
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Similarly, initialize the minimum Multixact to the first value that
|
||||||
|
* could possibly be stored in tuples in the table. Running transactions
|
||||||
|
* could reuse values from their local cache, so we are careful to
|
||||||
|
* consider all currently running multis.
|
||||||
|
*
|
||||||
|
* XXX this could be refined further, but is it worth the hassle?
|
||||||
|
*/
|
||||||
|
*minmulti = GetOldestMultiXactId();
|
||||||
|
|
||||||
|
SMgrRelation srel = RelationCreateStorage(*newrnode, persistence);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* If required, set up an init fork for an unlogged table so that it can
|
||||||
|
* be correctly reinitialized on restart. An immediate sync is required
|
||||||
|
* even if the page has been logged, because the write did not go through
|
||||||
|
* shared_buffers and therefore a concurrent checkpoint may have moved the
|
||||||
|
* redo pointer past our xlog record. Recovery may as well remove it
|
||||||
|
* while replaying, for example, XLOG_DBASE_CREATE or XLOG_TBLSPC_CREATE
|
||||||
|
* record. Therefore, logging is necessary even if wal_level=minimal.
|
||||||
|
*/
|
||||||
|
if (persistence == RELPERSISTENCE_UNLOGGED)
|
||||||
|
{
|
||||||
|
Assert(rel->rd_rel->relkind == RELKIND_RELATION ||
|
||||||
|
rel->rd_rel->relkind == RELKIND_MATVIEW ||
|
||||||
|
rel->rd_rel->relkind == RELKIND_TOASTVALUE);
|
||||||
|
smgrcreate(srel, INIT_FORKNUM, false);
|
||||||
|
log_smgrcreate(newrnode, INIT_FORKNUM);
|
||||||
|
smgrimmedsync(srel, INIT_FORKNUM);
|
||||||
|
}
|
||||||
|
|
||||||
|
smgrclose(srel);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
static void
|
||||||
|
fake_relation_nontransactional_truncate(Relation rel)
|
||||||
|
{
|
||||||
|
elog(ERROR, "fake_relation_nontransactional_truncate not implemented");
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
static void
|
||||||
|
fake_copy_data(Relation rel, const RelFileNode *newrnode)
|
||||||
|
{
|
||||||
|
elog(ERROR, "fake_copy_data not implemented");
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
static void
|
||||||
|
fake_copy_for_cluster(Relation OldTable, Relation NewTable,
|
||||||
|
Relation OldIndex, bool use_sort,
|
||||||
|
TransactionId OldestXmin,
|
||||||
|
TransactionId *xid_cutoff,
|
||||||
|
MultiXactId *multi_cutoff,
|
||||||
|
double *num_tuples,
|
||||||
|
double *tups_vacuumed,
|
||||||
|
double *tups_recently_dead)
|
||||||
|
{
|
||||||
|
elog(ERROR, "fake_copy_for_cluster not implemented");
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
static void
|
||||||
|
fake_vacuum(Relation onerel, VacuumParams *params,
|
||||||
|
BufferAccessStrategy bstrategy)
|
||||||
|
{
|
||||||
|
elog(WARNING, "fake_copy_for_cluster not implemented");
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
static bool
|
||||||
|
fake_scan_analyze_next_block(TableScanDesc scan, BlockNumber blockno,
|
||||||
|
BufferAccessStrategy bstrategy)
|
||||||
|
{
|
||||||
|
/* we don't support analyze, so return false */
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
static bool
|
||||||
|
fake_scan_analyze_next_tuple(TableScanDesc scan, TransactionId OldestXmin,
|
||||||
|
double *liverows, double *deadrows,
|
||||||
|
TupleTableSlot *slot)
|
||||||
|
{
|
||||||
|
elog(ERROR, "fake_scan_analyze_next_tuple not implemented");
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
static double
|
||||||
|
fake_index_build_range_scan(Relation tableRelation,
|
||||||
|
Relation indexRelation,
|
||||||
|
IndexInfo *indexInfo,
|
||||||
|
bool allow_sync,
|
||||||
|
bool anyvisible,
|
||||||
|
bool progress,
|
||||||
|
BlockNumber start_blockno,
|
||||||
|
BlockNumber numblocks,
|
||||||
|
IndexBuildCallback callback,
|
||||||
|
void *callback_state,
|
||||||
|
TableScanDesc scan)
|
||||||
|
{
|
||||||
|
elog(ERROR, "fake_index_build_range_scan not implemented");
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
static void
|
||||||
|
fake_index_validate_scan(Relation tableRelation,
|
||||||
|
Relation indexRelation,
|
||||||
|
IndexInfo *indexInfo,
|
||||||
|
Snapshot snapshot,
|
||||||
|
ValidateIndexState *state)
|
||||||
|
{
|
||||||
|
elog(ERROR, "fake_index_build_range_scan not implemented");
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/* ------------------------------------------------------------------------
|
||||||
|
* Miscellaneous callbacks for the fake AM
|
||||||
|
* ------------------------------------------------------------------------
|
||||||
|
*/
|
||||||
|
static uint64
|
||||||
|
fake_relation_size(Relation rel, ForkNumber forkNumber)
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
* Code below is copied from heapam_relation_size from
|
||||||
|
* heapam_handler.c.
|
||||||
|
*/
|
||||||
|
|
||||||
|
uint64 nblocks = 0;
|
||||||
|
|
||||||
|
/* Open it at the smgr level if not already done */
|
||||||
|
RelationOpenSmgr(rel);
|
||||||
|
|
||||||
|
/* InvalidForkNumber indicates returning the size for all forks */
|
||||||
|
if (forkNumber == InvalidForkNumber)
|
||||||
|
{
|
||||||
|
for (int i = 0; i < MAX_FORKNUM; i++)
|
||||||
|
{
|
||||||
|
nblocks += smgrnblocks(rel->rd_smgr, i);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
nblocks = smgrnblocks(rel->rd_smgr, forkNumber);
|
||||||
|
}
|
||||||
|
|
||||||
|
return nblocks * BLCKSZ;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Check to see whether the table needs a TOAST table.
|
||||||
|
*/
|
||||||
|
static bool
|
||||||
|
fake_relation_needs_toast_table(Relation rel)
|
||||||
|
{
|
||||||
|
/* we don't test toastable data with this, so no toast table needed */
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/* ------------------------------------------------------------------------
|
||||||
|
* Planner related callbacks for the fake AM
|
||||||
|
* ------------------------------------------------------------------------
|
||||||
|
*/
|
||||||
|
static void
|
||||||
|
fake_estimate_rel_size(Relation rel, int32 *attr_widths,
|
||||||
|
BlockNumber *pages, double *tuples,
|
||||||
|
double *allvisfrac)
|
||||||
|
{
|
||||||
|
/* no data available */
|
||||||
|
*attr_widths = 0;
|
||||||
|
*tuples = 0;
|
||||||
|
*allvisfrac = 0;
|
||||||
|
*pages = 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/* ------------------------------------------------------------------------
|
||||||
|
* Executor related callbacks for the fake AM
|
||||||
|
* ------------------------------------------------------------------------
|
||||||
|
*/
|
||||||
|
static bool
|
||||||
|
fake_scan_bitmap_next_block(TableScanDesc scan,
|
||||||
|
TBMIterateResult *tbmres)
|
||||||
|
{
|
||||||
|
elog(ERROR, "fake_scan_bitmap_next_block not implemented");
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
static bool
|
||||||
|
fake_scan_bitmap_next_tuple(TableScanDesc scan,
|
||||||
|
TBMIterateResult *tbmres,
|
||||||
|
TupleTableSlot *slot)
|
||||||
|
{
|
||||||
|
elog(ERROR, "fake_scan_bitmap_next_tuple not implemented");
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
static bool
|
||||||
|
fake_scan_sample_next_block(TableScanDesc scan,
|
||||||
|
SampleScanState *scanstate)
|
||||||
|
{
|
||||||
|
elog(ERROR, "fake_scan_sample_next_block not implemented");
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
static bool
|
||||||
|
fake_scan_sample_next_tuple(TableScanDesc scan,
|
||||||
|
SampleScanState *scanstate,
|
||||||
|
TupleTableSlot *slot)
|
||||||
|
{
|
||||||
|
elog(ERROR, "fake_scan_sample_next_tuple not implemented");
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/* ------------------------------------------------------------------------
|
||||||
|
* Definition of the fake table access method.
|
||||||
|
* ------------------------------------------------------------------------
|
||||||
|
*/
|
||||||
|
|
||||||
|
static const TableAmRoutine fake_methods = {
|
||||||
|
.type = T_TableAmRoutine,
|
||||||
|
|
||||||
|
.slot_callbacks = fake_slot_callbacks,
|
||||||
|
|
||||||
|
.scan_begin = fake_scan_begin,
|
||||||
|
.scan_end = fake_scan_end,
|
||||||
|
.scan_rescan = fake_scan_rescan,
|
||||||
|
.scan_getnextslot = fake_scan_getnextslot,
|
||||||
|
|
||||||
|
/* these are common helper functions */
|
||||||
|
.parallelscan_estimate = table_block_parallelscan_estimate,
|
||||||
|
.parallelscan_initialize = table_block_parallelscan_initialize,
|
||||||
|
.parallelscan_reinitialize = table_block_parallelscan_reinitialize,
|
||||||
|
|
||||||
|
.index_fetch_begin = fake_index_fetch_begin,
|
||||||
|
.index_fetch_reset = fake_index_fetch_reset,
|
||||||
|
.index_fetch_end = fake_index_fetch_end,
|
||||||
|
.index_fetch_tuple = fake_index_fetch_tuple,
|
||||||
|
|
||||||
|
.tuple_insert = fake_tuple_insert,
|
||||||
|
.tuple_insert_speculative = fake_tuple_insert_speculative,
|
||||||
|
.tuple_complete_speculative = fake_tuple_complete_speculative,
|
||||||
|
.multi_insert = fake_multi_insert,
|
||||||
|
.tuple_delete = fake_tuple_delete,
|
||||||
|
.tuple_update = fake_tuple_update,
|
||||||
|
.tuple_lock = fake_tuple_lock,
|
||||||
|
.finish_bulk_insert = fake_finish_bulk_insert,
|
||||||
|
|
||||||
|
.tuple_fetch_row_version = fake_fetch_row_version,
|
||||||
|
.tuple_get_latest_tid = fake_get_latest_tid,
|
||||||
|
.tuple_tid_valid = fake_tuple_tid_valid,
|
||||||
|
.tuple_satisfies_snapshot = fake_tuple_satisfies_snapshot,
|
||||||
|
.compute_xid_horizon_for_tuples = fake_compute_xid_horizon_for_tuples,
|
||||||
|
|
||||||
|
.relation_set_new_filenode = fake_relation_set_new_filenode,
|
||||||
|
.relation_nontransactional_truncate = fake_relation_nontransactional_truncate,
|
||||||
|
.relation_copy_data = fake_copy_data,
|
||||||
|
.relation_copy_for_cluster = fake_copy_for_cluster,
|
||||||
|
.relation_vacuum = fake_vacuum,
|
||||||
|
.scan_analyze_next_block = fake_scan_analyze_next_block,
|
||||||
|
.scan_analyze_next_tuple = fake_scan_analyze_next_tuple,
|
||||||
|
.index_build_range_scan = fake_index_build_range_scan,
|
||||||
|
.index_validate_scan = fake_index_validate_scan,
|
||||||
|
|
||||||
|
.relation_size = fake_relation_size,
|
||||||
|
.relation_needs_toast_table = fake_relation_needs_toast_table,
|
||||||
|
|
||||||
|
.relation_estimate_size = fake_estimate_rel_size,
|
||||||
|
|
||||||
|
.scan_bitmap_next_block = fake_scan_bitmap_next_block,
|
||||||
|
.scan_bitmap_next_tuple = fake_scan_bitmap_next_tuple,
|
||||||
|
.scan_sample_next_block = fake_scan_sample_next_block,
|
||||||
|
.scan_sample_next_tuple = fake_scan_sample_next_tuple
|
||||||
|
};
|
||||||
|
|
||||||
|
|
||||||
|
Datum
|
||||||
|
fake_am_handler(PG_FUNCTION_ARGS)
|
||||||
|
{
|
||||||
|
PG_RETURN_POINTER(&fake_methods);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
#endif
|
|
@ -15,10 +15,13 @@
|
||||||
|
|
||||||
#include "distributed/pg_version_constants.h"
|
#include "distributed/pg_version_constants.h"
|
||||||
|
|
||||||
|
#include "access/sdir.h"
|
||||||
|
#include "access/heapam.h"
|
||||||
#include "commands/explain.h"
|
#include "commands/explain.h"
|
||||||
#include "catalog/namespace.h"
|
#include "catalog/namespace.h"
|
||||||
#include "distributed/citus_ruleutils.h"
|
#include "distributed/citus_ruleutils.h"
|
||||||
#include "distributed/citus_safe_lib.h"
|
#include "distributed/citus_safe_lib.h"
|
||||||
|
#include "executor/tuptable.h"
|
||||||
#include "nodes/parsenodes.h"
|
#include "nodes/parsenodes.h"
|
||||||
#include "parser/parse_func.h"
|
#include "parser/parse_func.h"
|
||||||
#if (PG_VERSION_NUM >= PG_VERSION_12)
|
#if (PG_VERSION_NUM >= PG_VERSION_12)
|
||||||
|
@ -61,6 +64,7 @@
|
||||||
#endif
|
#endif
|
||||||
#if PG_VERSION_NUM >= PG_VERSION_12
|
#if PG_VERSION_NUM >= PG_VERSION_12
|
||||||
|
|
||||||
|
#define CreateTableSlotForRel(rel) table_slot_create(rel, NULL)
|
||||||
#define MakeSingleTupleTableSlotCompat MakeSingleTupleTableSlot
|
#define MakeSingleTupleTableSlotCompat MakeSingleTupleTableSlot
|
||||||
#define AllocSetContextCreateExtended AllocSetContextCreateInternal
|
#define AllocSetContextCreateExtended AllocSetContextCreateInternal
|
||||||
#define NextCopyFromCompat NextCopyFrom
|
#define NextCopyFromCompat NextCopyFrom
|
||||||
|
@ -122,6 +126,7 @@ FileCompatFromFileStart(File fileDesc)
|
||||||
|
|
||||||
|
|
||||||
#else /* pre PG12 */
|
#else /* pre PG12 */
|
||||||
|
#define CreateTableSlotForRel(rel) MakeSingleTupleTableSlot(RelationGetDescr(rel))
|
||||||
#define table_open(r, l) heap_open(r, l)
|
#define table_open(r, l) heap_open(r, l)
|
||||||
#define table_openrv(r, l) heap_openrv(r, l)
|
#define table_openrv(r, l) heap_openrv(r, l)
|
||||||
#define table_openrv_extended(r, l, m) heap_openrv_extended(r, l, m)
|
#define table_openrv_extended(r, l, m) heap_openrv_extended(r, l, m)
|
||||||
|
@ -184,6 +189,23 @@ FileCompatFromFileStart(File fileDesc)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* postgres 11 equivalent for a function with the same name in postgres 12+.
|
||||||
|
*/
|
||||||
|
static inline bool
|
||||||
|
table_scan_getnextslot(HeapScanDesc scan, ScanDirection dir, TupleTableSlot *slot)
|
||||||
|
{
|
||||||
|
HeapTuple tuple = heap_getnext(scan, ForwardScanDirection);
|
||||||
|
if (tuple == NULL)
|
||||||
|
{
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
ExecStoreTuple(tuple, slot, InvalidBuffer, false);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
#endif /* PG12 */
|
#endif /* PG12 */
|
||||||
|
|
||||||
#define fcSetArg(fc, n, value) fcSetArgExt(fc, n, value, false)
|
#define fcSetArg(fc, n, value) fcSetArgExt(fc, n, value, false)
|
||||||
|
|
|
@ -14,6 +14,9 @@ s/shard [0-9]+/shard xxxxx/g
|
||||||
s/assigned task [0-9]+ to node/assigned task to node/
|
s/assigned task [0-9]+ to node/assigned task to node/
|
||||||
s/node group [12] (but|does)/node group \1/
|
s/node group [12] (but|does)/node group \1/
|
||||||
|
|
||||||
|
# discard "USING heap" in "CREATE TABLE ... USING heap"
|
||||||
|
s/CREATE(.*)TABLE(.*)USING heap/CREATE\1TABLE\2/g
|
||||||
|
|
||||||
# Differing names can have differing table column widths
|
# Differing names can have differing table column widths
|
||||||
s/^-[+-]{2,}$/---------------------------------------------------------------------/g
|
s/^-[+-]{2,}$/---------------------------------------------------------------------/g
|
||||||
|
|
||||||
|
@ -100,6 +103,7 @@ s/_id_ref_id_fkey/_id_fkey/g
|
||||||
s/_ref_id_id_fkey_/_ref_id_fkey_/g
|
s/_ref_id_id_fkey_/_ref_id_fkey_/g
|
||||||
s/fk_test_2_col1_col2_fkey/fk_test_2_col1_fkey/g
|
s/fk_test_2_col1_col2_fkey/fk_test_2_col1_fkey/g
|
||||||
s/_id_other_column_ref_fkey/_id_fkey/g
|
s/_id_other_column_ref_fkey/_id_fkey/g
|
||||||
|
s/"(collections_list_|collection_users_|collection_users_fkey_)[0-9]+"/"\1xxxxxxx"/g
|
||||||
|
|
||||||
# pg13 changes
|
# pg13 changes
|
||||||
s/of relation ".*" violates not-null constraint/violates not-null constraint/g
|
s/of relation ".*" violates not-null constraint/violates not-null constraint/g
|
||||||
|
|
|
@ -10,16 +10,6 @@ SET citus.next_shard_id TO 60000;
|
||||||
SET citus.next_placement_id TO 60000;
|
SET citus.next_placement_id TO 60000;
|
||||||
create schema test_pg12;
|
create schema test_pg12;
|
||||||
set search_path to test_pg12;
|
set search_path to test_pg12;
|
||||||
CREATE FUNCTION blackhole_am_handler(internal)
|
|
||||||
RETURNS table_am_handler
|
|
||||||
AS 'citus'
|
|
||||||
LANGUAGE C;
|
|
||||||
CREATE ACCESS METHOD blackhole_am TYPE TABLE HANDLER blackhole_am_handler;
|
|
||||||
create table test_am(id int, val int) using blackhole_am;
|
|
||||||
insert into test_am values (1, 1);
|
|
||||||
-- Custom table access methods should be rejected
|
|
||||||
select create_distributed_table('test_am','id');
|
|
||||||
ERROR: cannot distribute relations using non-heap access methods
|
|
||||||
-- Test generated columns
|
-- Test generated columns
|
||||||
-- val1 after val2 to test https://github.com/citusdata/citus/issues/3538
|
-- val1 after val2 to test https://github.com/citusdata/citus/issues/3538
|
||||||
create table gen1 (
|
create table gen1 (
|
||||||
|
@ -302,8 +292,8 @@ HINT: To remove the local data, run: SELECT truncate_local_data_after_distribut
|
||||||
|
|
||||||
-- should still fail because of fkey
|
-- should still fail because of fkey
|
||||||
INSERT INTO collection_users VALUES (1, 1000, 1);
|
INSERT INTO collection_users VALUES (1, 1000, 1);
|
||||||
ERROR: insert or update on table "collection_users_60028" violates foreign key constraint "collection_users_fkey_60028"
|
ERROR: insert or update on table "collection_users_xxxxxxx" violates foreign key constraint "collection_users_fkey_xxxxxxx"
|
||||||
DETAIL: Key (key, collection_id)=(1, 1000) is not present in table "collections_list_60016".
|
DETAIL: Key (key, collection_id)=(1, 1000) is not present in table "collections_list_xxxxxxx".
|
||||||
CONTEXT: while executing command on localhost:xxxxx
|
CONTEXT: while executing command on localhost:xxxxx
|
||||||
-- whereas new record with partition should go through
|
-- whereas new record with partition should go through
|
||||||
INSERT INTO collections_list VALUES (2, 1, '1.2');
|
INSERT INTO collections_list VALUES (2, 1, '1.2');
|
||||||
|
@ -423,6 +413,6 @@ where val = 'asdf';
|
||||||
|
|
||||||
\set VERBOSITY terse
|
\set VERBOSITY terse
|
||||||
drop schema test_pg12 cascade;
|
drop schema test_pg12 cascade;
|
||||||
NOTICE: drop cascades to 13 other objects
|
NOTICE: drop cascades to 10 other objects
|
||||||
\set VERBOSITY default
|
\set VERBOSITY default
|
||||||
SET citus.shard_replication_factor to 2;
|
SET citus.shard_replication_factor to 2;
|
||||||
|
|
|
@ -0,0 +1,323 @@
|
||||||
|
SHOW server_version \gset
|
||||||
|
SELECT substring(:'server_version', '\d+')::int > 11 AS server_version_above_eleven
|
||||||
|
\gset
|
||||||
|
\if :server_version_above_eleven
|
||||||
|
\else
|
||||||
|
\q
|
||||||
|
\endif
|
||||||
|
SET citus.shard_replication_factor to 1;
|
||||||
|
SET citus.next_shard_id TO 60000;
|
||||||
|
SET citus.next_placement_id TO 60000;
|
||||||
|
SET citus.shard_count TO 4;
|
||||||
|
create schema test_tableam;
|
||||||
|
set search_path to test_tableam;
|
||||||
|
SELECT public.run_command_on_coordinator_and_workers($Q$
|
||||||
|
CREATE FUNCTION fake_am_handler(internal)
|
||||||
|
RETURNS table_am_handler
|
||||||
|
AS 'citus'
|
||||||
|
LANGUAGE C;
|
||||||
|
CREATE ACCESS METHOD fake_am TYPE TABLE HANDLER fake_am_handler;
|
||||||
|
$Q$);
|
||||||
|
run_command_on_coordinator_and_workers
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
--
|
||||||
|
-- Hash distributed table using a non-default table access method
|
||||||
|
--
|
||||||
|
create table test_hash_dist(id int, val int) using fake_am;
|
||||||
|
insert into test_hash_dist values (1, 1);
|
||||||
|
WARNING: fake_tuple_insert
|
||||||
|
select create_distributed_table('test_hash_dist','id');
|
||||||
|
WARNING: fake_scan_getnextslot
|
||||||
|
CONTEXT: SQL statement "SELECT EXISTS (SELECT 1 FROM test_tableam.test_hash_dist)"
|
||||||
|
WARNING: fake_scan_getnextslot
|
||||||
|
NOTICE: Copying data from local table...
|
||||||
|
WARNING: fake_scan_getnextslot
|
||||||
|
NOTICE: copying the data has completed
|
||||||
|
DETAIL: The local data in the table is no longer visible, but is still on disk.
|
||||||
|
HINT: To remove the local data, run: SELECT truncate_local_data_after_distributing_table($$test_tableam.test_hash_dist$$)
|
||||||
|
create_distributed_table
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
select * from test_hash_dist;
|
||||||
|
WARNING: fake_scan_getnextslot
|
||||||
|
DETAIL: from localhost:xxxxx
|
||||||
|
WARNING: fake_scan_getnextslot
|
||||||
|
DETAIL: from localhost:xxxxx
|
||||||
|
WARNING: fake_scan_getnextslot
|
||||||
|
DETAIL: from localhost:xxxxx
|
||||||
|
WARNING: fake_scan_getnextslot
|
||||||
|
DETAIL: from localhost:xxxxx
|
||||||
|
WARNING: fake_scan_getnextslot
|
||||||
|
DETAIL: from localhost:xxxxx
|
||||||
|
id | val
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
1 | 1
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
insert into test_hash_dist values (1, 1);
|
||||||
|
WARNING: fake_tuple_insert
|
||||||
|
DETAIL: from localhost:xxxxx
|
||||||
|
-- we should error on following, since this AM is append only
|
||||||
|
SET client_min_messages TO ERROR;
|
||||||
|
delete from test_hash_dist where id=1;
|
||||||
|
ERROR: fake_tuple_delete not implemented
|
||||||
|
CONTEXT: while executing command on localhost:xxxxx
|
||||||
|
update test_hash_dist set val=2 where id=2;
|
||||||
|
RESET client_min_messages;
|
||||||
|
-- ddl events should include "USING fake_am"
|
||||||
|
SELECT * FROM master_get_table_ddl_events('test_hash_dist');
|
||||||
|
master_get_table_ddl_events
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
CREATE TABLE test_tableam.test_hash_dist (id integer, val integer) USING fake_am
|
||||||
|
ALTER TABLE test_tableam.test_hash_dist OWNER TO postgres
|
||||||
|
(2 rows)
|
||||||
|
|
||||||
|
--
|
||||||
|
-- Reference table using a non-default table access method
|
||||||
|
--
|
||||||
|
create table test_ref(a int) using fake_am;
|
||||||
|
insert into test_ref values (1);
|
||||||
|
WARNING: fake_tuple_insert
|
||||||
|
select create_reference_table('test_ref');
|
||||||
|
WARNING: fake_scan_getnextslot
|
||||||
|
CONTEXT: SQL statement "SELECT EXISTS (SELECT 1 FROM test_tableam.test_ref)"
|
||||||
|
WARNING: fake_scan_getnextslot
|
||||||
|
NOTICE: Copying data from local table...
|
||||||
|
WARNING: fake_scan_getnextslot
|
||||||
|
NOTICE: copying the data has completed
|
||||||
|
DETAIL: The local data in the table is no longer visible, but is still on disk.
|
||||||
|
HINT: To remove the local data, run: SELECT truncate_local_data_after_distributing_table($$test_tableam.test_ref$$)
|
||||||
|
create_reference_table
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
select * from test_ref;
|
||||||
|
WARNING: fake_scan_getnextslot
|
||||||
|
DETAIL: from localhost:xxxxx
|
||||||
|
WARNING: fake_scan_getnextslot
|
||||||
|
DETAIL: from localhost:xxxxx
|
||||||
|
a
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
1
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
insert into test_ref values (1);
|
||||||
|
WARNING: fake_tuple_insert
|
||||||
|
DETAIL: from localhost:xxxxx
|
||||||
|
WARNING: fake_tuple_insert
|
||||||
|
DETAIL: from localhost:xxxxx
|
||||||
|
-- we should error on following, since this AM is append only
|
||||||
|
SET client_min_messages TO ERROR;
|
||||||
|
delete from test_ref;
|
||||||
|
ERROR: fake_tuple_delete not implemented
|
||||||
|
CONTEXT: while executing command on localhost:xxxxx
|
||||||
|
update test_ref set a=2;
|
||||||
|
ERROR: fake_tuple_update not implemented
|
||||||
|
CONTEXT: while executing command on localhost:xxxxx
|
||||||
|
RESET client_min_messages;
|
||||||
|
-- ddl events should include "USING fake_am"
|
||||||
|
SELECT * FROM master_get_table_ddl_events('test_ref');
|
||||||
|
master_get_table_ddl_events
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
CREATE TABLE test_tableam.test_ref (a integer) USING fake_am
|
||||||
|
ALTER TABLE test_tableam.test_ref OWNER TO postgres
|
||||||
|
(2 rows)
|
||||||
|
|
||||||
|
-- replicate to coordinator
|
||||||
|
SET client_min_messages TO WARNING;
|
||||||
|
SELECT 1 FROM master_add_node('localhost', :master_port, groupid => 0);
|
||||||
|
?column?
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
1
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
RESET client_min_messages;
|
||||||
|
delete from test_ref;
|
||||||
|
WARNING: fake_scan_getnextslot
|
||||||
|
ERROR: fake_tuple_delete not implemented
|
||||||
|
SELECT master_remove_node('localhost', :master_port);
|
||||||
|
master_remove_node
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
--
|
||||||
|
-- Range partitioned table using a non-default table access method
|
||||||
|
--
|
||||||
|
CREATE TABLE test_range_dist(id int, val int) using fake_am;
|
||||||
|
SELECT create_distributed_table('test_range_dist', 'id', 'range');
|
||||||
|
WARNING: fake_scan_getnextslot
|
||||||
|
CONTEXT: SQL statement "SELECT EXISTS (SELECT 1 FROM test_tableam.test_range_dist)"
|
||||||
|
WARNING: fake_scan_getnextslot
|
||||||
|
CONTEXT: SQL statement "SELECT EXISTS (SELECT 1 FROM test_tableam.test_range_dist)"
|
||||||
|
create_distributed_table
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
CALL public.create_range_partitioned_shards('test_range_dist', '{"0","25"}','{"24","49"}');
|
||||||
|
select * from test_range_dist;
|
||||||
|
WARNING: fake_scan_getnextslot
|
||||||
|
DETAIL: from localhost:xxxxx
|
||||||
|
WARNING: fake_scan_getnextslot
|
||||||
|
DETAIL: from localhost:xxxxx
|
||||||
|
id | val
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
(0 rows)
|
||||||
|
|
||||||
|
insert into test_range_dist values (1, 1);
|
||||||
|
WARNING: fake_tuple_insert
|
||||||
|
DETAIL: from localhost:xxxxx
|
||||||
|
COPY test_range_dist FROM PROGRAM 'echo 0, 0 && echo 1, -1 && echo 2, 4 && echo 3, 9' WITH CSV;
|
||||||
|
COPY test_range_dist FROM PROGRAM 'echo 25, 16 && echo 26, 1 && echo 27, 4 && echo 7, 9' WITH CSV;
|
||||||
|
-- ddl events should include "USING fake_am"
|
||||||
|
SELECT * FROM master_get_table_ddl_events('test_range_dist');
|
||||||
|
master_get_table_ddl_events
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
CREATE TABLE test_tableam.test_range_dist (id integer, val integer) USING fake_am
|
||||||
|
ALTER TABLE test_tableam.test_range_dist OWNER TO postgres
|
||||||
|
(2 rows)
|
||||||
|
|
||||||
|
--
|
||||||
|
-- Test master_copy_shard_placement with a fake_am table
|
||||||
|
--
|
||||||
|
select a.shardid, a.nodeport
|
||||||
|
FROM pg_dist_shard b, pg_dist_shard_placement a
|
||||||
|
WHERE a.shardid=b.shardid AND logicalrelid = 'test_range_dist'::regclass::oid
|
||||||
|
ORDER BY a.shardid, nodeport;
|
||||||
|
shardid | nodeport
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
60005 | 57637
|
||||||
|
60006 | 57638
|
||||||
|
(2 rows)
|
||||||
|
|
||||||
|
SELECT master_copy_shard_placement(
|
||||||
|
get_shard_id_for_distribution_column('test_range_dist', '1'),
|
||||||
|
'localhost', :worker_1_port,
|
||||||
|
'localhost', :worker_2_port,
|
||||||
|
do_repair := false);
|
||||||
|
master_copy_shard_placement
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
select a.shardid, a.nodeport
|
||||||
|
FROM pg_dist_shard b, pg_dist_shard_placement a
|
||||||
|
WHERE a.shardid=b.shardid AND logicalrelid = 'test_range_dist'::regclass::oid
|
||||||
|
ORDER BY a.shardid, nodeport;
|
||||||
|
shardid | nodeport
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
60005 | 57637
|
||||||
|
60005 | 57638
|
||||||
|
60006 | 57638
|
||||||
|
(3 rows)
|
||||||
|
|
||||||
|
-- verify that data was copied correctly
|
||||||
|
\c - - - :worker_1_port
|
||||||
|
select * from test_tableam.test_range_dist_60005 ORDER BY id;
|
||||||
|
WARNING: fake_scan_getnextslot
|
||||||
|
WARNING: fake_scan_getnextslot
|
||||||
|
WARNING: fake_scan_getnextslot
|
||||||
|
WARNING: fake_scan_getnextslot
|
||||||
|
WARNING: fake_scan_getnextslot
|
||||||
|
WARNING: fake_scan_getnextslot
|
||||||
|
WARNING: fake_scan_getnextslot
|
||||||
|
id | val
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
0 | 0
|
||||||
|
1 | 1
|
||||||
|
1 | -1
|
||||||
|
2 | 4
|
||||||
|
3 | 9
|
||||||
|
7 | 9
|
||||||
|
(6 rows)
|
||||||
|
|
||||||
|
\c - - - :worker_2_port
|
||||||
|
select * from test_tableam.test_range_dist_60005 ORDER BY id;
|
||||||
|
WARNING: fake_scan_getnextslot
|
||||||
|
WARNING: fake_scan_getnextslot
|
||||||
|
WARNING: fake_scan_getnextslot
|
||||||
|
WARNING: fake_scan_getnextslot
|
||||||
|
WARNING: fake_scan_getnextslot
|
||||||
|
WARNING: fake_scan_getnextslot
|
||||||
|
WARNING: fake_scan_getnextslot
|
||||||
|
id | val
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
0 | 0
|
||||||
|
1 | 1
|
||||||
|
1 | -1
|
||||||
|
2 | 4
|
||||||
|
3 | 9
|
||||||
|
7 | 9
|
||||||
|
(6 rows)
|
||||||
|
|
||||||
|
\c - - - :master_port
|
||||||
|
--
|
||||||
|
-- Test that partitioned tables work correctly with a fake_am table
|
||||||
|
--
|
||||||
|
-- parent using default am, one of children using fake_am
|
||||||
|
CREATE TABLE test_partitioned(id int, p int, val int)
|
||||||
|
PARTITION BY RANGE (p);
|
||||||
|
CREATE TABLE test_partitioned_p1 PARTITION OF test_partitioned
|
||||||
|
FOR VALUES FROM (1) TO (10);
|
||||||
|
CREATE TABLE test_partitioned_p2 PARTITION OF test_partitioned
|
||||||
|
FOR VALUES FROM (11) TO (20) USING fake_am;
|
||||||
|
INSERT INTO test_partitioned VALUES (1, 5, -1), (2, 15, -2);
|
||||||
|
WARNING: fake_tuple_insert
|
||||||
|
SELECT create_distributed_table('test_partitioned', 'id');
|
||||||
|
NOTICE: Copying data from local table...
|
||||||
|
NOTICE: copying the data has completed
|
||||||
|
DETAIL: The local data in the table is no longer visible, but is still on disk.
|
||||||
|
HINT: To remove the local data, run: SELECT truncate_local_data_after_distributing_table($$public.test_partitioned_p1$$)
|
||||||
|
WARNING: fake_scan_getnextslot
|
||||||
|
CONTEXT: SQL statement "SELECT EXISTS (SELECT 1 FROM public.test_partitioned_p2)"
|
||||||
|
WARNING: fake_scan_getnextslot
|
||||||
|
NOTICE: Copying data from local table...
|
||||||
|
WARNING: fake_scan_getnextslot
|
||||||
|
NOTICE: copying the data has completed
|
||||||
|
DETAIL: The local data in the table is no longer visible, but is still on disk.
|
||||||
|
HINT: To remove the local data, run: SELECT truncate_local_data_after_distributing_table($$public.test_partitioned_p2$$)
|
||||||
|
create_distributed_table
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
INSERT INTO test_partitioned VALUES (3, 6, -6), (4, 16, -4);
|
||||||
|
WARNING: fake_tuple_insert
|
||||||
|
DETAIL: from localhost:xxxxx
|
||||||
|
WARNING: fake_tuple_insert
|
||||||
|
DETAIL: from localhost:xxxxx
|
||||||
|
SELECT count(*) FROM test_partitioned;
|
||||||
|
WARNING: fake_scan_getnextslot
|
||||||
|
DETAIL: from localhost:xxxxx
|
||||||
|
WARNING: fake_scan_getnextslot
|
||||||
|
DETAIL: from localhost:xxxxx
|
||||||
|
WARNING: fake_scan_getnextslot
|
||||||
|
DETAIL: from localhost:xxxxx
|
||||||
|
WARNING: fake_scan_getnextslot
|
||||||
|
DETAIL: from localhost:xxxxx
|
||||||
|
WARNING: fake_scan_getnextslot
|
||||||
|
DETAIL: from localhost:xxxxx
|
||||||
|
WARNING: fake_scan_getnextslot
|
||||||
|
DETAIL: from localhost:xxxxx
|
||||||
|
count
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
4
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
DROP TABLE test_partitioned;
|
||||||
|
-- Specifying access method in parent is not supported.
|
||||||
|
-- If the below statement ever succeeds, add more tests for
|
||||||
|
-- the case where children inherit access method from parent.
|
||||||
|
CREATE TABLE test_partitioned(id int, p int, val int)
|
||||||
|
PARTITION BY RANGE (p) USING fake_am;
|
||||||
|
ERROR: specifying a table access method is not supported on a partitioned table
|
||||||
|
\set VERBOSITY terse
|
||||||
|
drop schema test_tableam cascade;
|
||||||
|
NOTICE: drop cascades to 6 other objects
|
|
@ -0,0 +1,6 @@
|
||||||
|
SHOW server_version \gset
|
||||||
|
SELECT substring(:'server_version', '\d+')::int > 11 AS server_version_above_eleven
|
||||||
|
\gset
|
||||||
|
\if :server_version_above_eleven
|
||||||
|
\else
|
||||||
|
\q
|
|
@ -82,6 +82,7 @@ test: set_operation_and_local_tables
|
||||||
test: subqueries_deep subquery_view subquery_partitioning subquery_complex_target_list subqueries_not_supported subquery_in_where
|
test: subqueries_deep subquery_view subquery_partitioning subquery_complex_target_list subqueries_not_supported subquery_in_where
|
||||||
test: non_colocated_leaf_subquery_joins non_colocated_subquery_joins non_colocated_join_order
|
test: non_colocated_leaf_subquery_joins non_colocated_subquery_joins non_colocated_join_order
|
||||||
test: subquery_prepared_statements pg12 cte_inline pg13
|
test: subquery_prepared_statements pg12 cte_inline pg13
|
||||||
|
test: tableam
|
||||||
|
|
||||||
# ----------
|
# ----------
|
||||||
# Miscellaneous tests to check our query planning behavior
|
# Miscellaneous tests to check our query planning behavior
|
||||||
|
|
|
@ -13,17 +13,6 @@ SET citus.next_placement_id TO 60000;
|
||||||
create schema test_pg12;
|
create schema test_pg12;
|
||||||
set search_path to test_pg12;
|
set search_path to test_pg12;
|
||||||
|
|
||||||
CREATE FUNCTION blackhole_am_handler(internal)
|
|
||||||
RETURNS table_am_handler
|
|
||||||
AS 'citus'
|
|
||||||
LANGUAGE C;
|
|
||||||
CREATE ACCESS METHOD blackhole_am TYPE TABLE HANDLER blackhole_am_handler;
|
|
||||||
|
|
||||||
create table test_am(id int, val int) using blackhole_am;
|
|
||||||
insert into test_am values (1, 1);
|
|
||||||
-- Custom table access methods should be rejected
|
|
||||||
select create_distributed_table('test_am','id');
|
|
||||||
|
|
||||||
-- Test generated columns
|
-- Test generated columns
|
||||||
-- val1 after val2 to test https://github.com/citusdata/citus/issues/3538
|
-- val1 after val2 to test https://github.com/citusdata/citus/issues/3538
|
||||||
create table gen1 (
|
create table gen1 (
|
||||||
|
|
|
@ -0,0 +1,149 @@
|
||||||
|
SHOW server_version \gset
|
||||||
|
SELECT substring(:'server_version', '\d+')::int > 11 AS server_version_above_eleven
|
||||||
|
\gset
|
||||||
|
\if :server_version_above_eleven
|
||||||
|
\else
|
||||||
|
\q
|
||||||
|
\endif
|
||||||
|
|
||||||
|
SET citus.shard_replication_factor to 1;
|
||||||
|
SET citus.next_shard_id TO 60000;
|
||||||
|
SET citus.next_placement_id TO 60000;
|
||||||
|
SET citus.shard_count TO 4;
|
||||||
|
|
||||||
|
create schema test_tableam;
|
||||||
|
set search_path to test_tableam;
|
||||||
|
|
||||||
|
SELECT public.run_command_on_coordinator_and_workers($Q$
|
||||||
|
CREATE FUNCTION fake_am_handler(internal)
|
||||||
|
RETURNS table_am_handler
|
||||||
|
AS 'citus'
|
||||||
|
LANGUAGE C;
|
||||||
|
CREATE ACCESS METHOD fake_am TYPE TABLE HANDLER fake_am_handler;
|
||||||
|
$Q$);
|
||||||
|
|
||||||
|
--
|
||||||
|
-- Hash distributed table using a non-default table access method
|
||||||
|
--
|
||||||
|
|
||||||
|
create table test_hash_dist(id int, val int) using fake_am;
|
||||||
|
insert into test_hash_dist values (1, 1);
|
||||||
|
select create_distributed_table('test_hash_dist','id');
|
||||||
|
|
||||||
|
select * from test_hash_dist;
|
||||||
|
insert into test_hash_dist values (1, 1);
|
||||||
|
|
||||||
|
-- we should error on following, since this AM is append only
|
||||||
|
SET client_min_messages TO ERROR;
|
||||||
|
delete from test_hash_dist where id=1;
|
||||||
|
update test_hash_dist set val=2 where id=2;
|
||||||
|
RESET client_min_messages;
|
||||||
|
|
||||||
|
-- ddl events should include "USING fake_am"
|
||||||
|
SELECT * FROM master_get_table_ddl_events('test_hash_dist');
|
||||||
|
|
||||||
|
--
|
||||||
|
-- Reference table using a non-default table access method
|
||||||
|
--
|
||||||
|
|
||||||
|
create table test_ref(a int) using fake_am;
|
||||||
|
insert into test_ref values (1);
|
||||||
|
select create_reference_table('test_ref');
|
||||||
|
|
||||||
|
select * from test_ref;
|
||||||
|
insert into test_ref values (1);
|
||||||
|
|
||||||
|
-- we should error on following, since this AM is append only
|
||||||
|
SET client_min_messages TO ERROR;
|
||||||
|
delete from test_ref;
|
||||||
|
update test_ref set a=2;
|
||||||
|
RESET client_min_messages;
|
||||||
|
|
||||||
|
-- ddl events should include "USING fake_am"
|
||||||
|
SELECT * FROM master_get_table_ddl_events('test_ref');
|
||||||
|
|
||||||
|
-- replicate to coordinator
|
||||||
|
SET client_min_messages TO WARNING;
|
||||||
|
SELECT 1 FROM master_add_node('localhost', :master_port, groupid => 0);
|
||||||
|
RESET client_min_messages;
|
||||||
|
delete from test_ref;
|
||||||
|
SELECT master_remove_node('localhost', :master_port);
|
||||||
|
|
||||||
|
--
|
||||||
|
-- Range partitioned table using a non-default table access method
|
||||||
|
--
|
||||||
|
|
||||||
|
CREATE TABLE test_range_dist(id int, val int) using fake_am;
|
||||||
|
SELECT create_distributed_table('test_range_dist', 'id', 'range');
|
||||||
|
CALL public.create_range_partitioned_shards('test_range_dist', '{"0","25"}','{"24","49"}');
|
||||||
|
|
||||||
|
select * from test_range_dist;
|
||||||
|
insert into test_range_dist values (1, 1);
|
||||||
|
COPY test_range_dist FROM PROGRAM 'echo 0, 0 && echo 1, -1 && echo 2, 4 && echo 3, 9' WITH CSV;
|
||||||
|
COPY test_range_dist FROM PROGRAM 'echo 25, 16 && echo 26, 1 && echo 27, 4 && echo 7, 9' WITH CSV;
|
||||||
|
|
||||||
|
|
||||||
|
-- ddl events should include "USING fake_am"
|
||||||
|
SELECT * FROM master_get_table_ddl_events('test_range_dist');
|
||||||
|
|
||||||
|
--
|
||||||
|
-- Test master_copy_shard_placement with a fake_am table
|
||||||
|
--
|
||||||
|
|
||||||
|
select a.shardid, a.nodeport
|
||||||
|
FROM pg_dist_shard b, pg_dist_shard_placement a
|
||||||
|
WHERE a.shardid=b.shardid AND logicalrelid = 'test_range_dist'::regclass::oid
|
||||||
|
ORDER BY a.shardid, nodeport;
|
||||||
|
|
||||||
|
SELECT master_copy_shard_placement(
|
||||||
|
get_shard_id_for_distribution_column('test_range_dist', '1'),
|
||||||
|
'localhost', :worker_1_port,
|
||||||
|
'localhost', :worker_2_port,
|
||||||
|
do_repair := false);
|
||||||
|
|
||||||
|
select a.shardid, a.nodeport
|
||||||
|
FROM pg_dist_shard b, pg_dist_shard_placement a
|
||||||
|
WHERE a.shardid=b.shardid AND logicalrelid = 'test_range_dist'::regclass::oid
|
||||||
|
ORDER BY a.shardid, nodeport;
|
||||||
|
|
||||||
|
-- verify that data was copied correctly
|
||||||
|
|
||||||
|
\c - - - :worker_1_port
|
||||||
|
select * from test_tableam.test_range_dist_60005 ORDER BY id;
|
||||||
|
|
||||||
|
\c - - - :worker_2_port
|
||||||
|
select * from test_tableam.test_range_dist_60005 ORDER BY id;
|
||||||
|
|
||||||
|
\c - - - :master_port
|
||||||
|
|
||||||
|
--
|
||||||
|
-- Test that partitioned tables work correctly with a fake_am table
|
||||||
|
--
|
||||||
|
|
||||||
|
-- parent using default am, one of children using fake_am
|
||||||
|
CREATE TABLE test_partitioned(id int, p int, val int)
|
||||||
|
PARTITION BY RANGE (p);
|
||||||
|
|
||||||
|
CREATE TABLE test_partitioned_p1 PARTITION OF test_partitioned
|
||||||
|
FOR VALUES FROM (1) TO (10);
|
||||||
|
CREATE TABLE test_partitioned_p2 PARTITION OF test_partitioned
|
||||||
|
FOR VALUES FROM (11) TO (20) USING fake_am;
|
||||||
|
|
||||||
|
INSERT INTO test_partitioned VALUES (1, 5, -1), (2, 15, -2);
|
||||||
|
|
||||||
|
SELECT create_distributed_table('test_partitioned', 'id');
|
||||||
|
|
||||||
|
INSERT INTO test_partitioned VALUES (3, 6, -6), (4, 16, -4);
|
||||||
|
|
||||||
|
SELECT count(*) FROM test_partitioned;
|
||||||
|
|
||||||
|
DROP TABLE test_partitioned;
|
||||||
|
|
||||||
|
-- Specifying access method in parent is not supported.
|
||||||
|
-- If the below statement ever succeeds, add more tests for
|
||||||
|
-- the case where children inherit access method from parent.
|
||||||
|
CREATE TABLE test_partitioned(id int, p int, val int)
|
||||||
|
PARTITION BY RANGE (p) USING fake_am;
|
||||||
|
|
||||||
|
\set VERBOSITY terse
|
||||||
|
drop schema test_tableam cascade;
|
Loading…
Reference in New Issue