mirror of https://github.com/citusdata/citus.git
Create a test 'pg12' for pg12 features & error on unsupported new features
Unsupported new features: COPY FROM WHERE, GENERATED ALWAYS AS, non-heap table access methodspull/2844/head
parent
e84fcc0b12
commit
693d4695d7
|
@ -21,12 +21,16 @@
|
||||||
#include "catalog/dependency.h"
|
#include "catalog/dependency.h"
|
||||||
#include "catalog/index.h"
|
#include "catalog/index.h"
|
||||||
#include "catalog/pg_am.h"
|
#include "catalog/pg_am.h"
|
||||||
|
#include "catalog/pg_attribute.h"
|
||||||
#if (PG_VERSION_NUM < 110000)
|
#if (PG_VERSION_NUM < 110000)
|
||||||
#include "catalog/pg_constraint_fn.h"
|
#include "catalog/pg_constraint_fn.h"
|
||||||
#endif
|
#endif
|
||||||
#include "catalog/pg_enum.h"
|
#include "catalog/pg_enum.h"
|
||||||
#include "catalog/pg_extension.h"
|
#include "catalog/pg_extension.h"
|
||||||
#include "catalog/pg_opclass.h"
|
#include "catalog/pg_opclass.h"
|
||||||
|
#if PG_VERSION_NUM >= 12000
|
||||||
|
#include "catalog/pg_proc.h"
|
||||||
|
#endif
|
||||||
#include "catalog/pg_trigger.h"
|
#include "catalog/pg_trigger.h"
|
||||||
#include "commands/defrem.h"
|
#include "commands/defrem.h"
|
||||||
#include "commands/extension.h"
|
#include "commands/extension.h"
|
||||||
|
@ -99,6 +103,8 @@ static bool LocalTableEmpty(Oid tableId);
|
||||||
static void CopyLocalDataIntoShards(Oid relationId);
|
static void CopyLocalDataIntoShards(Oid relationId);
|
||||||
static List * TupleDescColumnNameList(TupleDesc tupleDescriptor);
|
static List * TupleDescColumnNameList(TupleDesc tupleDescriptor);
|
||||||
static bool RelationUsesIdentityColumns(TupleDesc relationDesc);
|
static bool RelationUsesIdentityColumns(TupleDesc relationDesc);
|
||||||
|
static bool RelationUsesGeneratedStoredColumns(TupleDesc relationDesc);
|
||||||
|
static bool RelationUsesHeapAccessMethodOrNone(Relation relation);
|
||||||
static bool CanUseExclusiveConnections(Oid relationId, bool localTableEmpty);
|
static bool CanUseExclusiveConnections(Oid relationId, bool localTableEmpty);
|
||||||
|
|
||||||
/* exports for SQL callable functions */
|
/* exports for SQL callable functions */
|
||||||
|
@ -645,6 +651,12 @@ EnsureRelationCanBeDistributed(Oid relationId, Var *distributionColumn,
|
||||||
relationDesc = RelationGetDescr(relation);
|
relationDesc = RelationGetDescr(relation);
|
||||||
relationName = RelationGetRelationName(relation);
|
relationName = RelationGetRelationName(relation);
|
||||||
|
|
||||||
|
if (!RelationUsesHeapAccessMethodOrNone(relation))
|
||||||
|
{
|
||||||
|
ereport(ERROR, (errmsg(
|
||||||
|
"cannot distribute relations using non-heap access methods")));
|
||||||
|
}
|
||||||
|
|
||||||
#if PG_VERSION_NUM < 120000
|
#if PG_VERSION_NUM < 120000
|
||||||
|
|
||||||
/* verify target relation does not use WITH (OIDS) PostgreSQL feature */
|
/* verify target relation does not use WITH (OIDS) PostgreSQL feature */
|
||||||
|
@ -666,6 +678,15 @@ EnsureRelationCanBeDistributed(Oid relationId, Var *distributionColumn,
|
||||||
"... AS IDENTITY.")));
|
"... AS IDENTITY.")));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* verify target relation does not use generated columns */
|
||||||
|
if (RelationUsesGeneratedStoredColumns(relationDesc))
|
||||||
|
{
|
||||||
|
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||||
|
errmsg("cannot distribute relation: %s", relationName),
|
||||||
|
errdetail("Distributed relations must not use GENERATED ALWAYS "
|
||||||
|
"AS (...) STORED.")));
|
||||||
|
}
|
||||||
|
|
||||||
/* check for support function needed by specified partition method */
|
/* check for support function needed by specified partition method */
|
||||||
if (distributionMethod == DISTRIBUTE_BY_HASH)
|
if (distributionMethod == DISTRIBUTE_BY_HASH)
|
||||||
{
|
{
|
||||||
|
@ -1374,3 +1395,44 @@ RelationUsesIdentityColumns(TupleDesc relationDesc)
|
||||||
|
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* RelationUsesIdentityColumns returns whether a given relation uses the SQL
|
||||||
|
* GENERATED ... AS IDENTITY features introduced as of PostgreSQL 10.
|
||||||
|
*/
|
||||||
|
static bool
|
||||||
|
RelationUsesGeneratedStoredColumns(TupleDesc relationDesc)
|
||||||
|
{
|
||||||
|
#if PG_VERSION_NUM >= 120000
|
||||||
|
int attributeIndex = 0;
|
||||||
|
|
||||||
|
for (attributeIndex = 0; attributeIndex < relationDesc->natts; attributeIndex++)
|
||||||
|
{
|
||||||
|
Form_pg_attribute attributeForm = TupleDescAttr(relationDesc, attributeIndex);
|
||||||
|
|
||||||
|
if (attributeForm->attgenerated == ATTRIBUTE_GENERATED_STORED)
|
||||||
|
{
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Returns whether given relation uses default access method
|
||||||
|
*/
|
||||||
|
static bool
|
||||||
|
RelationUsesHeapAccessMethodOrNone(Relation relation)
|
||||||
|
{
|
||||||
|
#if PG_VERSION_NUM >= 120000
|
||||||
|
|
||||||
|
return relation->rd_rel->relkind != RELKIND_RELATION ||
|
||||||
|
relation->rd_amhandler == HEAP_TABLE_AM_HANDLER_OID;
|
||||||
|
#else
|
||||||
|
return true;
|
||||||
|
#endif
|
||||||
|
}
|
||||||
|
|
|
@ -2803,6 +2803,14 @@ ProcessCopyStmt(CopyStmt *copyStatement, char *completionTag, const char *queryS
|
||||||
{
|
{
|
||||||
if (copyStatement->is_from)
|
if (copyStatement->is_from)
|
||||||
{
|
{
|
||||||
|
#if PG_VERSION_NUM >= 120000
|
||||||
|
if (copyStatement->whereClause)
|
||||||
|
{
|
||||||
|
ereport(ERROR, (errmsg(
|
||||||
|
"Citus does not support COPY FROM with WHERE")));
|
||||||
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
/* check permissions, we're bypassing postgres' normal checks */
|
/* check permissions, we're bypassing postgres' normal checks */
|
||||||
if (!isCopyFromWorker)
|
if (!isCopyFromWorker)
|
||||||
{
|
{
|
||||||
|
@ -2812,7 +2820,7 @@ ProcessCopyStmt(CopyStmt *copyStatement, char *completionTag, const char *queryS
|
||||||
CitusCopyFrom(copyStatement, completionTag);
|
CitusCopyFrom(copyStatement, completionTag);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
else if (!copyStatement->is_from)
|
else
|
||||||
{
|
{
|
||||||
/*
|
/*
|
||||||
* The copy code only handles SELECTs in COPY ... TO on master tables,
|
* The copy code only handles SELECTs in COPY ... TO on master tables,
|
||||||
|
|
|
@ -194,7 +194,7 @@ multi_ProcessUtility(PlannedStmt *pstmt,
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* TRANSMIT used to be separate command, but to avoid patching the grammar
|
* TRANSMIT used to be separate command, but to avoid patching the grammar
|
||||||
* it's no overlaid onto COPY, but with FORMAT = 'transmit' instead of the
|
* it's now overlaid onto COPY, but with FORMAT = 'transmit' instead of the
|
||||||
* normal FORMAT options.
|
* normal FORMAT options.
|
||||||
*/
|
*/
|
||||||
if (IsTransmitStmt(parsetree))
|
if (IsTransmitStmt(parsetree))
|
||||||
|
|
|
@ -0,0 +1,502 @@
|
||||||
|
/*-------------------------------------------------------------------------
|
||||||
|
*
|
||||||
|
* blackhole_am.c
|
||||||
|
* blackhole table access method code
|
||||||
|
*
|
||||||
|
* Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
|
||||||
|
* Portions Copyright (c) 1994, Regents of the University of California
|
||||||
|
*
|
||||||
|
*
|
||||||
|
* IDENTIFICATION
|
||||||
|
* Copied from https://github.com/michaelpq/pg_plugins/blob/master/blackhole_am/blackhole_am.c
|
||||||
|
*
|
||||||
|
*
|
||||||
|
* NOTES
|
||||||
|
* This file introduces the table access method blackhole, which can
|
||||||
|
* be used as a template for other table access methods, and guarantees
|
||||||
|
* that any data inserted into it gets sent to the void.
|
||||||
|
*
|
||||||
|
*-------------------------------------------------------------------------
|
||||||
|
*/
|
||||||
|
|
||||||
|
/* *INDENT-OFF* */
|
||||||
|
#include "postgres.h"
|
||||||
|
|
||||||
|
#if PG_VERSION_NUM >= 120000
|
||||||
|
|
||||||
|
#include "access/tableam.h"
|
||||||
|
#include "access/heapam.h"
|
||||||
|
#include "access/amapi.h"
|
||||||
|
#include "catalog/index.h"
|
||||||
|
#include "commands/vacuum.h"
|
||||||
|
#include "executor/tuptable.h"
|
||||||
|
|
||||||
|
PG_FUNCTION_INFO_V1(blackhole_am_handler);
|
||||||
|
|
||||||
|
/* Base structures for scans */
|
||||||
|
typedef struct BlackholeScanDescData
|
||||||
|
{
|
||||||
|
TableScanDescData rs_base; /* AM independent part of the descriptor */
|
||||||
|
|
||||||
|
/* Add more fields here as needed by the AM. */
|
||||||
|
} BlackholeScanDescData;
|
||||||
|
typedef struct BlackholeScanDescData *BlackholeScanDesc;
|
||||||
|
|
||||||
|
static const TableAmRoutine blackhole_methods;
|
||||||
|
|
||||||
|
|
||||||
|
/* ------------------------------------------------------------------------
|
||||||
|
* Slot related callbacks for blackhole AM
|
||||||
|
* ------------------------------------------------------------------------
|
||||||
|
*/
|
||||||
|
|
||||||
|
static const TupleTableSlotOps *
|
||||||
|
blackhole_slot_callbacks(Relation relation)
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
* Here you would most likely want to invent your own set of
|
||||||
|
* slot callbacks for your AM.
|
||||||
|
*/
|
||||||
|
return &TTSOpsMinimalTuple;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* ------------------------------------------------------------------------
|
||||||
|
* Table Scan Callbacks for blackhole AM
|
||||||
|
* ------------------------------------------------------------------------
|
||||||
|
*/
|
||||||
|
|
||||||
|
static TableScanDesc
|
||||||
|
blackhole_scan_begin(Relation relation, Snapshot snapshot,
|
||||||
|
int nkeys, ScanKey key,
|
||||||
|
ParallelTableScanDesc parallel_scan,
|
||||||
|
uint32 flags)
|
||||||
|
{
|
||||||
|
BlackholeScanDesc scan;
|
||||||
|
|
||||||
|
scan = (BlackholeScanDesc) palloc(sizeof(BlackholeScanDescData));
|
||||||
|
|
||||||
|
scan->rs_base.rs_rd = relation;
|
||||||
|
scan->rs_base.rs_snapshot = snapshot;
|
||||||
|
scan->rs_base.rs_nkeys = nkeys;
|
||||||
|
scan->rs_base.rs_flags = flags;
|
||||||
|
scan->rs_base.rs_parallel = parallel_scan;
|
||||||
|
|
||||||
|
return (TableScanDesc) scan; }
|
||||||
|
|
||||||
|
static void
|
||||||
|
blackhole_scan_end(TableScanDesc sscan)
|
||||||
|
{
|
||||||
|
BlackholeScanDesc scan = (BlackholeScanDesc) sscan;
|
||||||
|
pfree(scan);
|
||||||
|
}
|
||||||
|
|
||||||
|
static void
|
||||||
|
blackhole_scan_rescan(TableScanDesc sscan, ScanKey key, bool set_params,
|
||||||
|
bool allow_strat, bool allow_sync, bool allow_pagemode)
|
||||||
|
{
|
||||||
|
/* nothing to do */
|
||||||
|
}
|
||||||
|
|
||||||
|
static bool
|
||||||
|
blackhole_scan_getnextslot(TableScanDesc sscan, ScanDirection direction,
|
||||||
|
TupleTableSlot *slot)
|
||||||
|
{
|
||||||
|
/* nothing to do */
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* ------------------------------------------------------------------------
|
||||||
|
* Index Scan Callbacks for blackhole AM
|
||||||
|
* ------------------------------------------------------------------------
|
||||||
|
*/
|
||||||
|
|
||||||
|
static IndexFetchTableData *
|
||||||
|
blackhole_index_fetch_begin(Relation rel)
|
||||||
|
{
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
static void
|
||||||
|
blackhole_index_fetch_reset(IndexFetchTableData *scan)
|
||||||
|
{
|
||||||
|
/* nothing to do here */
|
||||||
|
}
|
||||||
|
|
||||||
|
static void
|
||||||
|
blackhole_index_fetch_end(IndexFetchTableData *scan)
|
||||||
|
{
|
||||||
|
/* nothing to do here */
|
||||||
|
}
|
||||||
|
|
||||||
|
static bool
|
||||||
|
blackhole_index_fetch_tuple(struct IndexFetchTableData *scan,
|
||||||
|
ItemPointer tid,
|
||||||
|
Snapshot snapshot,
|
||||||
|
TupleTableSlot *slot,
|
||||||
|
bool *call_again, bool *all_dead)
|
||||||
|
{
|
||||||
|
/* there is no data */
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/* ------------------------------------------------------------------------
|
||||||
|
* Callbacks for non-modifying operations on individual tuples for
|
||||||
|
* blackhole AM.
|
||||||
|
* ------------------------------------------------------------------------
|
||||||
|
*/
|
||||||
|
|
||||||
|
static bool
|
||||||
|
blackhole_fetch_row_version(Relation relation,
|
||||||
|
ItemPointer tid,
|
||||||
|
Snapshot snapshot,
|
||||||
|
TupleTableSlot *slot)
|
||||||
|
{
|
||||||
|
/* nothing to do */
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
static void
|
||||||
|
blackhole_get_latest_tid(TableScanDesc sscan,
|
||||||
|
ItemPointer tid)
|
||||||
|
{
|
||||||
|
/* nothing to do */
|
||||||
|
}
|
||||||
|
|
||||||
|
static bool
|
||||||
|
blackhole_tuple_tid_valid(TableScanDesc scan, ItemPointer tid)
|
||||||
|
{
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
static bool
|
||||||
|
blackhole_tuple_satisfies_snapshot(Relation rel, TupleTableSlot *slot,
|
||||||
|
Snapshot snapshot)
|
||||||
|
{
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
static TransactionId
|
||||||
|
blackhole_compute_xid_horizon_for_tuples(Relation rel,
|
||||||
|
ItemPointerData *tids,
|
||||||
|
int nitems)
|
||||||
|
{
|
||||||
|
return InvalidTransactionId;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* ----------------------------------------------------------------------------
|
||||||
|
* Functions for manipulations of physical tuples for blackhole AM.
|
||||||
|
* ----------------------------------------------------------------------------
|
||||||
|
*/
|
||||||
|
|
||||||
|
static void
|
||||||
|
blackhole_tuple_insert(Relation relation, TupleTableSlot *slot,
|
||||||
|
CommandId cid, int options, BulkInsertState bistate)
|
||||||
|
{
|
||||||
|
/* nothing to do */
|
||||||
|
}
|
||||||
|
|
||||||
|
static void
|
||||||
|
blackhole_tuple_insert_speculative(Relation relation, TupleTableSlot *slot,
|
||||||
|
CommandId cid, int options,
|
||||||
|
BulkInsertState bistate,
|
||||||
|
uint32 specToken)
|
||||||
|
{
|
||||||
|
/* nothing to do */
|
||||||
|
}
|
||||||
|
|
||||||
|
static void
|
||||||
|
blackhole_tuple_complete_speculative(Relation relation, TupleTableSlot *slot,
|
||||||
|
uint32 spekToken, bool succeeded)
|
||||||
|
{
|
||||||
|
/* nothing to do */
|
||||||
|
}
|
||||||
|
|
||||||
|
static void
|
||||||
|
blackhole_multi_insert(Relation relation, TupleTableSlot **slots,
|
||||||
|
int ntuples, CommandId cid, int options,
|
||||||
|
BulkInsertState bistate)
|
||||||
|
{
|
||||||
|
/* nothing to do */
|
||||||
|
}
|
||||||
|
|
||||||
|
static TM_Result
|
||||||
|
blackhole_tuple_delete(Relation relation, ItemPointer tid, CommandId cid,
|
||||||
|
Snapshot snapshot, Snapshot crosscheck, bool wait,
|
||||||
|
TM_FailureData *tmfd, bool changingPart)
|
||||||
|
{
|
||||||
|
/* nothing to do, so it is always OK */
|
||||||
|
return TM_Ok;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
static TM_Result
|
||||||
|
blackhole_tuple_update(Relation relation, ItemPointer otid,
|
||||||
|
TupleTableSlot *slot, CommandId cid,
|
||||||
|
Snapshot snapshot, Snapshot crosscheck,
|
||||||
|
bool wait, TM_FailureData *tmfd,
|
||||||
|
LockTupleMode *lockmode, bool *update_indexes)
|
||||||
|
{
|
||||||
|
/* nothing to do, so it is always OK */
|
||||||
|
return TM_Ok;
|
||||||
|
}
|
||||||
|
|
||||||
|
static TM_Result
|
||||||
|
blackhole_tuple_lock(Relation relation, ItemPointer tid, Snapshot snapshot,
|
||||||
|
TupleTableSlot *slot, CommandId cid, LockTupleMode mode,
|
||||||
|
LockWaitPolicy wait_policy, uint8 flags,
|
||||||
|
TM_FailureData *tmfd)
|
||||||
|
{
|
||||||
|
/* nothing to do, so it is always OK */
|
||||||
|
return TM_Ok;
|
||||||
|
}
|
||||||
|
|
||||||
|
static void
|
||||||
|
blackhole_finish_bulk_insert(Relation relation, int options)
|
||||||
|
{
|
||||||
|
/* nothing to do */
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/* ------------------------------------------------------------------------
|
||||||
|
* DDL related callbacks for blackhole AM.
|
||||||
|
* ------------------------------------------------------------------------
|
||||||
|
*/
|
||||||
|
|
||||||
|
static void
|
||||||
|
blackhole_relation_set_new_filenode(Relation rel,
|
||||||
|
const RelFileNode *newrnode,
|
||||||
|
char persistence,
|
||||||
|
TransactionId *freezeXid,
|
||||||
|
MultiXactId *minmulti)
|
||||||
|
{
|
||||||
|
/* nothing to do */
|
||||||
|
}
|
||||||
|
|
||||||
|
static void
|
||||||
|
blackhole_relation_nontransactional_truncate(Relation rel)
|
||||||
|
{
|
||||||
|
/* nothing to do */
|
||||||
|
}
|
||||||
|
|
||||||
|
static void
|
||||||
|
blackhole_copy_data(Relation rel, const RelFileNode *newrnode)
|
||||||
|
{
|
||||||
|
/* there is no data */
|
||||||
|
}
|
||||||
|
|
||||||
|
static void
|
||||||
|
blackhole_copy_for_cluster(Relation OldTable, Relation NewTable,
|
||||||
|
Relation OldIndex, bool use_sort,
|
||||||
|
TransactionId OldestXmin,
|
||||||
|
TransactionId *xid_cutoff,
|
||||||
|
MultiXactId *multi_cutoff,
|
||||||
|
double *num_tuples,
|
||||||
|
double *tups_vacuumed,
|
||||||
|
double *tups_recently_dead)
|
||||||
|
{
|
||||||
|
/* no data, so nothing to do */
|
||||||
|
}
|
||||||
|
|
||||||
|
static void
|
||||||
|
blackhole_vacuum(Relation onerel, VacuumParams *params,
|
||||||
|
BufferAccessStrategy bstrategy)
|
||||||
|
{
|
||||||
|
/* no data, so nothing to do */
|
||||||
|
}
|
||||||
|
|
||||||
|
static bool
|
||||||
|
blackhole_scan_analyze_next_block(TableScanDesc scan, BlockNumber blockno,
|
||||||
|
BufferAccessStrategy bstrategy)
|
||||||
|
{
|
||||||
|
/* no data, so no point to analyze next block */
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
static bool
|
||||||
|
blackhole_scan_analyze_next_tuple(TableScanDesc scan, TransactionId OldestXmin,
|
||||||
|
double *liverows, double *deadrows,
|
||||||
|
TupleTableSlot *slot)
|
||||||
|
{
|
||||||
|
/* no data, so no point to analyze next tuple */
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
static double
|
||||||
|
blackhole_index_build_range_scan(Relation tableRelation,
|
||||||
|
Relation indexRelation,
|
||||||
|
IndexInfo *indexInfo,
|
||||||
|
bool allow_sync,
|
||||||
|
bool anyvisible,
|
||||||
|
bool progress,
|
||||||
|
BlockNumber start_blockno,
|
||||||
|
BlockNumber numblocks,
|
||||||
|
IndexBuildCallback callback,
|
||||||
|
void *callback_state,
|
||||||
|
TableScanDesc scan)
|
||||||
|
{
|
||||||
|
/* no data, so no tuples */
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
static void
|
||||||
|
blackhole_index_validate_scan(Relation tableRelation,
|
||||||
|
Relation indexRelation,
|
||||||
|
IndexInfo *indexInfo,
|
||||||
|
Snapshot snapshot,
|
||||||
|
ValidateIndexState *state)
|
||||||
|
{
|
||||||
|
/* nothing to do */
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/* ------------------------------------------------------------------------
|
||||||
|
* Miscellaneous callbacks for the blackhole AM
|
||||||
|
* ------------------------------------------------------------------------
|
||||||
|
*/
|
||||||
|
|
||||||
|
static uint64
|
||||||
|
blackhole_relation_size(Relation rel, ForkNumber forkNumber)
|
||||||
|
{
|
||||||
|
/* there is nothing */
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Check to see whether the table needs a TOAST table.
|
||||||
|
*/
|
||||||
|
static bool
|
||||||
|
blackhole_relation_needs_toast_table(Relation rel)
|
||||||
|
{
|
||||||
|
/* no data, so no toast table needed */
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/* ------------------------------------------------------------------------
|
||||||
|
* Planner related callbacks for the blackhole AM
|
||||||
|
* ------------------------------------------------------------------------
|
||||||
|
*/
|
||||||
|
|
||||||
|
static void
|
||||||
|
blackhole_estimate_rel_size(Relation rel, int32 *attr_widths,
|
||||||
|
BlockNumber *pages, double *tuples,
|
||||||
|
double *allvisfrac)
|
||||||
|
{
|
||||||
|
/* no data available */
|
||||||
|
*attr_widths = 0;
|
||||||
|
*tuples = 0;
|
||||||
|
*allvisfrac = 0;
|
||||||
|
*pages = 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/* ------------------------------------------------------------------------
|
||||||
|
* Executor related callbacks for the blackhole AM
|
||||||
|
* ------------------------------------------------------------------------
|
||||||
|
*/
|
||||||
|
|
||||||
|
static bool
|
||||||
|
blackhole_scan_bitmap_next_block(TableScanDesc scan,
|
||||||
|
TBMIterateResult *tbmres)
|
||||||
|
{
|
||||||
|
/* no data, so no point to scan next block */
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
static bool
|
||||||
|
blackhole_scan_bitmap_next_tuple(TableScanDesc scan,
|
||||||
|
TBMIterateResult *tbmres,
|
||||||
|
TupleTableSlot *slot)
|
||||||
|
{
|
||||||
|
/* no data, so no point to scan next tuple */
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
static bool
|
||||||
|
blackhole_scan_sample_next_block(TableScanDesc scan,
|
||||||
|
SampleScanState *scanstate)
|
||||||
|
{
|
||||||
|
/* no data, so no point to scan next block for sampling */
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
static bool
|
||||||
|
blackhole_scan_sample_next_tuple(TableScanDesc scan,
|
||||||
|
SampleScanState *scanstate,
|
||||||
|
TupleTableSlot *slot)
|
||||||
|
{
|
||||||
|
/* no data, so no point to scan next tuple for sampling */
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/* ------------------------------------------------------------------------
|
||||||
|
* Definition of the blackhole table access method.
|
||||||
|
* ------------------------------------------------------------------------
|
||||||
|
*/
|
||||||
|
|
||||||
|
static const TableAmRoutine blackhole_methods = {
|
||||||
|
.type = T_TableAmRoutine,
|
||||||
|
|
||||||
|
.slot_callbacks = blackhole_slot_callbacks,
|
||||||
|
|
||||||
|
.scan_begin = blackhole_scan_begin,
|
||||||
|
.scan_end = blackhole_scan_end,
|
||||||
|
.scan_rescan = blackhole_scan_rescan,
|
||||||
|
.scan_getnextslot = blackhole_scan_getnextslot,
|
||||||
|
|
||||||
|
/* these are common helper functions */
|
||||||
|
.parallelscan_estimate = table_block_parallelscan_estimate,
|
||||||
|
.parallelscan_initialize = table_block_parallelscan_initialize,
|
||||||
|
.parallelscan_reinitialize = table_block_parallelscan_reinitialize,
|
||||||
|
|
||||||
|
.index_fetch_begin = blackhole_index_fetch_begin,
|
||||||
|
.index_fetch_reset = blackhole_index_fetch_reset,
|
||||||
|
.index_fetch_end = blackhole_index_fetch_end,
|
||||||
|
.index_fetch_tuple = blackhole_index_fetch_tuple,
|
||||||
|
|
||||||
|
.tuple_insert = blackhole_tuple_insert,
|
||||||
|
.tuple_insert_speculative = blackhole_tuple_insert_speculative,
|
||||||
|
.tuple_complete_speculative = blackhole_tuple_complete_speculative,
|
||||||
|
.multi_insert = blackhole_multi_insert,
|
||||||
|
.tuple_delete = blackhole_tuple_delete,
|
||||||
|
.tuple_update = blackhole_tuple_update,
|
||||||
|
.tuple_lock = blackhole_tuple_lock,
|
||||||
|
.finish_bulk_insert = blackhole_finish_bulk_insert,
|
||||||
|
|
||||||
|
.tuple_fetch_row_version = blackhole_fetch_row_version,
|
||||||
|
.tuple_get_latest_tid = blackhole_get_latest_tid,
|
||||||
|
.tuple_tid_valid = blackhole_tuple_tid_valid,
|
||||||
|
.tuple_satisfies_snapshot = blackhole_tuple_satisfies_snapshot,
|
||||||
|
.compute_xid_horizon_for_tuples = blackhole_compute_xid_horizon_for_tuples,
|
||||||
|
|
||||||
|
.relation_set_new_filenode = blackhole_relation_set_new_filenode,
|
||||||
|
.relation_nontransactional_truncate = blackhole_relation_nontransactional_truncate,
|
||||||
|
.relation_copy_data = blackhole_copy_data,
|
||||||
|
.relation_copy_for_cluster = blackhole_copy_for_cluster,
|
||||||
|
.relation_vacuum = blackhole_vacuum,
|
||||||
|
.scan_analyze_next_block = blackhole_scan_analyze_next_block,
|
||||||
|
.scan_analyze_next_tuple = blackhole_scan_analyze_next_tuple,
|
||||||
|
.index_build_range_scan = blackhole_index_build_range_scan,
|
||||||
|
.index_validate_scan = blackhole_index_validate_scan,
|
||||||
|
|
||||||
|
.relation_size = blackhole_relation_size,
|
||||||
|
.relation_needs_toast_table = blackhole_relation_needs_toast_table,
|
||||||
|
|
||||||
|
.relation_estimate_size = blackhole_estimate_rel_size,
|
||||||
|
|
||||||
|
.scan_bitmap_next_block = blackhole_scan_bitmap_next_block,
|
||||||
|
.scan_bitmap_next_tuple = blackhole_scan_bitmap_next_tuple,
|
||||||
|
.scan_sample_next_block = blackhole_scan_sample_next_block,
|
||||||
|
.scan_sample_next_tuple = blackhole_scan_sample_next_tuple
|
||||||
|
};
|
||||||
|
|
||||||
|
|
||||||
|
Datum
|
||||||
|
blackhole_am_handler(PG_FUNCTION_ARGS)
|
||||||
|
{
|
||||||
|
PG_RETURN_POINTER(&blackhole_methods);
|
||||||
|
}
|
||||||
|
|
||||||
|
#endif
|
|
@ -0,0 +1,368 @@
|
||||||
|
SHOW server_version \gset
|
||||||
|
SELECT substring(:'server_version', '\d+')::int > 11 AS server_version_above_eleven
|
||||||
|
\gset
|
||||||
|
\if :server_version_above_eleven
|
||||||
|
\else
|
||||||
|
\q
|
||||||
|
\endif
|
||||||
|
SET citus.shard_replication_factor to 1;
|
||||||
|
SET citus.next_shard_id TO 60000;
|
||||||
|
SET citus.next_placement_id TO 60000;
|
||||||
|
create schema test_pg12;
|
||||||
|
set search_path to test_pg12;
|
||||||
|
CREATE FUNCTION blackhole_am_handler(internal)
|
||||||
|
RETURNS table_am_handler
|
||||||
|
AS 'citus'
|
||||||
|
LANGUAGE C;
|
||||||
|
CREATE ACCESS METHOD blackhole_am TYPE TABLE HANDLER blackhole_am_handler;
|
||||||
|
create table test_am(id int, val int) using blackhole_am;
|
||||||
|
insert into test_am values (1, 1);
|
||||||
|
-- Custom table access methods should be rejected
|
||||||
|
select create_distributed_table('test_am','id');
|
||||||
|
ERROR: cannot distribute relations using non-heap access methods
|
||||||
|
-- Test generated columns
|
||||||
|
create table gen1 (
|
||||||
|
id int,
|
||||||
|
val1 int,
|
||||||
|
val2 int GENERATED ALWAYS AS (val1 + 2) STORED
|
||||||
|
);
|
||||||
|
create table gen2 (
|
||||||
|
id int,
|
||||||
|
val1 int,
|
||||||
|
val2 int GENERATED ALWAYS AS (val1 + 2) STORED
|
||||||
|
);
|
||||||
|
insert into gen1 (id, val1) values (1,4),(3,6),(5,2),(7,2);
|
||||||
|
insert into gen2 (id, val1) values (1,4),(3,6),(5,2),(7,2);
|
||||||
|
select * from create_distributed_table('gen1', 'id');
|
||||||
|
ERROR: cannot distribute relation: gen1
|
||||||
|
DETAIL: Distributed relations must not use GENERATED ALWAYS AS (...) STORED.
|
||||||
|
select * from create_distributed_table('gen2', 'val2');
|
||||||
|
ERROR: cannot distribute relation: gen2
|
||||||
|
DETAIL: Distributed relations must not use GENERATED ALWAYS AS (...) STORED.
|
||||||
|
insert into gen1 (id, val1) values (2,4),(4,6),(6,2),(8,2);
|
||||||
|
insert into gen2 (id, val1) values (2,4),(4,6),(6,2),(8,2);
|
||||||
|
select * from gen1;
|
||||||
|
id | val1 | val2
|
||||||
|
----+------+------
|
||||||
|
1 | 4 | 6
|
||||||
|
3 | 6 | 8
|
||||||
|
5 | 2 | 4
|
||||||
|
7 | 2 | 4
|
||||||
|
2 | 4 | 6
|
||||||
|
4 | 6 | 8
|
||||||
|
6 | 2 | 4
|
||||||
|
8 | 2 | 4
|
||||||
|
(8 rows)
|
||||||
|
|
||||||
|
select * from gen2;
|
||||||
|
id | val1 | val2
|
||||||
|
----+------+------
|
||||||
|
1 | 4 | 6
|
||||||
|
3 | 6 | 8
|
||||||
|
5 | 2 | 4
|
||||||
|
7 | 2 | 4
|
||||||
|
2 | 4 | 6
|
||||||
|
4 | 6 | 8
|
||||||
|
6 | 2 | 4
|
||||||
|
8 | 2 | 4
|
||||||
|
(8 rows)
|
||||||
|
|
||||||
|
-- Test new VACUUM/ANALYZE options
|
||||||
|
analyze (skip_locked) gen1;
|
||||||
|
vacuum (skip_locked) gen1;
|
||||||
|
vacuum (truncate 0) gen1;
|
||||||
|
vacuum (index_cleanup 1) gen1;
|
||||||
|
-- COPY FROM
|
||||||
|
create table cptest (id int, val int);
|
||||||
|
select create_distributed_table('cptest', 'id');
|
||||||
|
create_distributed_table
|
||||||
|
--------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
copy cptest from STDIN with csv where val < 4;
|
||||||
|
ERROR: Citus does not support COPY FROM with WHERE
|
||||||
|
1,6
|
||||||
|
2,3
|
||||||
|
3,2
|
||||||
|
4,9
|
||||||
|
5,4
|
||||||
|
\.
|
||||||
|
invalid command \.
|
||||||
|
select sum(id), sum(val) from cptest;
|
||||||
|
ERROR: syntax error at or near "1"
|
||||||
|
LINE 1: 1,6
|
||||||
|
^
|
||||||
|
-- CTE materialized/not materialized
|
||||||
|
CREATE TABLE single_hash_repartition_first (id int, sum int, avg float);
|
||||||
|
CREATE TABLE single_hash_repartition_second (id int primary key, sum int, avg float);
|
||||||
|
SELECT create_distributed_table('single_hash_repartition_first', 'id');
|
||||||
|
create_distributed_table
|
||||||
|
--------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT create_distributed_table('single_hash_repartition_second', 'id');
|
||||||
|
create_distributed_table
|
||||||
|
--------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
INSERT INTO single_hash_repartition_first
|
||||||
|
SELECT i, i * 3, i * 0.3
|
||||||
|
FROM generate_series(0, 100) i;
|
||||||
|
INSERT INTO single_hash_repartition_second
|
||||||
|
SELECT i * 2, i * 5, i * 0.6
|
||||||
|
FROM generate_series(0, 100) i;
|
||||||
|
-- a sample router query with NOT MATERIALIZED
|
||||||
|
-- which pushes down the filters to the CTE
|
||||||
|
SELECT public.coordinator_plan($Q$
|
||||||
|
EXPLAIN (COSTS OFF)
|
||||||
|
WITH cte1 AS NOT MATERIALIZED
|
||||||
|
(
|
||||||
|
SELECT id
|
||||||
|
FROM single_hash_repartition_first t1
|
||||||
|
)
|
||||||
|
SELECT count(*)
|
||||||
|
FROM cte1, single_hash_repartition_second
|
||||||
|
WHERE cte1.id = single_hash_repartition_second.id AND single_hash_repartition_second.id = 45;
|
||||||
|
$Q$);
|
||||||
|
coordinator_plan
|
||||||
|
------------------------------
|
||||||
|
Custom Scan (Citus Adaptive)
|
||||||
|
Task Count: 1
|
||||||
|
(2 rows)
|
||||||
|
|
||||||
|
-- same query, without NOT MATERIALIZED, which is already default
|
||||||
|
-- which pushes down the filters to the CTE
|
||||||
|
SELECT public.coordinator_plan($Q$
|
||||||
|
EXPLAIN (COSTS OFF)
|
||||||
|
WITH cte1 AS
|
||||||
|
(
|
||||||
|
SELECT id
|
||||||
|
FROM single_hash_repartition_first t1
|
||||||
|
)
|
||||||
|
SELECT count(*)
|
||||||
|
FROM cte1, single_hash_repartition_second
|
||||||
|
WHERE cte1.id = single_hash_repartition_second.id AND single_hash_repartition_second.id = 45;
|
||||||
|
$Q$);
|
||||||
|
coordinator_plan
|
||||||
|
------------------------------
|
||||||
|
Custom Scan (Citus Adaptive)
|
||||||
|
Task Count: 1
|
||||||
|
(2 rows)
|
||||||
|
|
||||||
|
-- same query with MATERIALIZED
|
||||||
|
-- which prevents pushing down filters to the CTE,
|
||||||
|
-- thus becoming a real-time query
|
||||||
|
SELECT public.coordinator_plan($Q$
|
||||||
|
EXPLAIN (COSTS OFF)
|
||||||
|
WITH cte1 AS MATERIALIZED
|
||||||
|
(
|
||||||
|
SELECT id
|
||||||
|
FROM single_hash_repartition_first t1
|
||||||
|
)
|
||||||
|
SELECT count(*)
|
||||||
|
FROM cte1, single_hash_repartition_second
|
||||||
|
WHERE cte1.id = single_hash_repartition_second.id AND single_hash_repartition_second.id = 45;
|
||||||
|
$Q$);
|
||||||
|
coordinator_plan
|
||||||
|
------------------------------------------
|
||||||
|
Custom Scan (Citus Adaptive)
|
||||||
|
-> Distributed Subplan 5_1
|
||||||
|
-> Custom Scan (Citus Adaptive)
|
||||||
|
Task Count: 4
|
||||||
|
(4 rows)
|
||||||
|
|
||||||
|
-- similar query with MATERIALIZED
|
||||||
|
-- now manually have the same filter in the CTE
|
||||||
|
-- thus becoming a router query again
|
||||||
|
SELECT public.coordinator_plan($Q$
|
||||||
|
EXPLAIN (COSTS OFF)
|
||||||
|
WITH cte1 AS MATERIALIZED
|
||||||
|
(
|
||||||
|
SELECT id
|
||||||
|
FROM single_hash_repartition_first t1
|
||||||
|
WHERE id = 45
|
||||||
|
)
|
||||||
|
SELECT count(*)
|
||||||
|
FROM cte1, single_hash_repartition_second
|
||||||
|
WHERE cte1.id = single_hash_repartition_second.id AND single_hash_repartition_second.id = 45;
|
||||||
|
$Q$);
|
||||||
|
coordinator_plan
|
||||||
|
------------------------------
|
||||||
|
Custom Scan (Citus Adaptive)
|
||||||
|
Task Count: 1
|
||||||
|
(2 rows)
|
||||||
|
|
||||||
|
-- now, have a real-time query without MATERIALIZED
|
||||||
|
-- these are sanitiy checks, because all of the CTEs are recursively
|
||||||
|
-- planned and there is no benefit that Citus can have
|
||||||
|
SELECT public.coordinator_plan($Q$
|
||||||
|
EXPLAIN (COSTS OFF)
|
||||||
|
WITH cte1 AS MATERIALIZED
|
||||||
|
(
|
||||||
|
SELECT id
|
||||||
|
FROM single_hash_repartition_first t1
|
||||||
|
WHERE sum = 45
|
||||||
|
)
|
||||||
|
SELECT count(*)
|
||||||
|
FROM cte1, single_hash_repartition_second
|
||||||
|
WHERE cte1.id = single_hash_repartition_second.id AND single_hash_repartition_second.sum = 45;
|
||||||
|
$Q$);
|
||||||
|
coordinator_plan
|
||||||
|
------------------------------------------------
|
||||||
|
Aggregate
|
||||||
|
-> Custom Scan (Citus Adaptive)
|
||||||
|
-> Distributed Subplan 8_1
|
||||||
|
-> Custom Scan (Citus Adaptive)
|
||||||
|
Task Count: 4
|
||||||
|
(5 rows)
|
||||||
|
|
||||||
|
SELECT public.coordinator_plan($Q$
|
||||||
|
EXPLAIN (COSTS OFF)
|
||||||
|
WITH cte1 AS NOT MATERIALIZED
|
||||||
|
(
|
||||||
|
SELECT id
|
||||||
|
FROM single_hash_repartition_first t1
|
||||||
|
WHERE sum = 45
|
||||||
|
)
|
||||||
|
SELECT count(*)
|
||||||
|
FROM cte1, single_hash_repartition_second
|
||||||
|
WHERE cte1.id = single_hash_repartition_second.id AND single_hash_repartition_second.sum = 45;
|
||||||
|
$Q$);
|
||||||
|
coordinator_plan
|
||||||
|
------------------------------------------------
|
||||||
|
Aggregate
|
||||||
|
-> Custom Scan (Citus Adaptive)
|
||||||
|
-> Distributed Subplan 10_1
|
||||||
|
-> Custom Scan (Citus Adaptive)
|
||||||
|
Task Count: 4
|
||||||
|
(5 rows)
|
||||||
|
|
||||||
|
-- Foreign keys to partition tables
|
||||||
|
CREATE TABLE collections_list (
|
||||||
|
key bigint,
|
||||||
|
collection_id integer,
|
||||||
|
value numeric,
|
||||||
|
PRIMARY KEY(key, collection_id)
|
||||||
|
) PARTITION BY LIST (collection_id);
|
||||||
|
CREATE TABLE collections_list_0
|
||||||
|
PARTITION OF collections_list (key, collection_id, value)
|
||||||
|
FOR VALUES IN ( 0 );
|
||||||
|
CREATE TABLE collections_list_1
|
||||||
|
PARTITION OF collections_list (key, collection_id, value)
|
||||||
|
FOR VALUES IN ( 1 );
|
||||||
|
CREATE TABLE collection_users
|
||||||
|
(used_id integer, collection_id integer, key bigint);
|
||||||
|
ALTER TABLE collection_users
|
||||||
|
ADD CONSTRAINT collection_users_fkey FOREIGN KEY (key, collection_id) REFERENCES collections_list (key, collection_id);
|
||||||
|
-- sanity check for postgres
|
||||||
|
INSERT INTO collections_list VALUES (1, 0, '1.1');
|
||||||
|
INSERT INTO collection_users VALUES (1, 0, 1);
|
||||||
|
-- should fail because of fkey
|
||||||
|
INSERT INTO collection_users VALUES (1, 1000, 1);
|
||||||
|
ERROR: insert or update on table "collection_users" violates foreign key constraint "collection_users_fkey"
|
||||||
|
DETAIL: Key (key, collection_id)=(1, 1000) is not present in table "collections_list".
|
||||||
|
SELECT create_distributed_table('collections_list', 'key');
|
||||||
|
NOTICE: Copying data from local table...
|
||||||
|
create_distributed_table
|
||||||
|
--------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT create_distributed_table('collection_users', 'key');
|
||||||
|
NOTICE: Copying data from local table...
|
||||||
|
create_distributed_table
|
||||||
|
--------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- should still fail because of fkey
|
||||||
|
INSERT INTO collection_users VALUES (1, 1000, 1);
|
||||||
|
ERROR: insert or update on table "collection_users_60024" violates foreign key constraint "collection_users_fkey_60024"
|
||||||
|
DETAIL: Key (key, collection_id)=(1, 1000) is not present in table "collections_list_60012".
|
||||||
|
CONTEXT: while executing command on localhost:57637
|
||||||
|
-- whereas new record with partition should go through
|
||||||
|
INSERT INTO collections_list VALUES (2, 1, '1.2');
|
||||||
|
INSERT INTO collection_users VALUES (5, 1, 2);
|
||||||
|
-- AND CHAIN
|
||||||
|
CREATE TABLE test (x int, y int);
|
||||||
|
INSERT INTO test (x,y) SELECT i,i*3 from generate_series(1, 100) i;
|
||||||
|
SELECT create_distributed_table('test', 'x');
|
||||||
|
NOTICE: Copying data from local table...
|
||||||
|
create_distributed_table
|
||||||
|
--------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- single shard queries with CHAIN
|
||||||
|
BEGIN;
|
||||||
|
UPDATE test SET y = 15 WHERE x = 1;
|
||||||
|
COMMIT AND CHAIN;
|
||||||
|
SELECT * FROM test WHERE x = 1;
|
||||||
|
x | y
|
||||||
|
---+----
|
||||||
|
1 | 15
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
COMMIT;
|
||||||
|
BEGIN;
|
||||||
|
UPDATE test SET y = 20 WHERE x = 1;
|
||||||
|
ROLLBACK AND CHAIN;
|
||||||
|
SELECT * FROM test WHERE x = 1;
|
||||||
|
x | y
|
||||||
|
---+----
|
||||||
|
1 | 15
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
COMMIT;
|
||||||
|
-- multi shard queries with CHAIN
|
||||||
|
BEGIN;
|
||||||
|
UPDATE test SET y = 25;
|
||||||
|
COMMIT AND CHAIN;
|
||||||
|
SELECT DISTINCT y FROM test;
|
||||||
|
y
|
||||||
|
----
|
||||||
|
25
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
COMMIT;
|
||||||
|
BEGIN;
|
||||||
|
UPDATE test SET y = 30;
|
||||||
|
ROLLBACK AND CHAIN;
|
||||||
|
SELECT DISTINCT y FROM test;
|
||||||
|
y
|
||||||
|
----
|
||||||
|
25
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
COMMIT;
|
||||||
|
-- does read only carry over?
|
||||||
|
BEGIN READ ONLY;
|
||||||
|
COMMIT AND CHAIN;
|
||||||
|
UPDATE test SET y = 35;
|
||||||
|
ERROR: cannot execute UPDATE in a read-only transaction
|
||||||
|
COMMIT;
|
||||||
|
SELECT DISTINCT y FROM test;
|
||||||
|
y
|
||||||
|
----
|
||||||
|
25
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
BEGIN READ ONLY;
|
||||||
|
ROLLBACK AND CHAIN;
|
||||||
|
UPDATE test SET y = 40;
|
||||||
|
ERROR: cannot execute UPDATE in a read-only transaction
|
||||||
|
COMMIT;
|
||||||
|
SELECT DISTINCT y FROM test;
|
||||||
|
y
|
||||||
|
----
|
||||||
|
25
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
\set VERBOSITY terse
|
||||||
|
drop schema test_pg12 cascade;
|
||||||
|
NOTICE: drop cascades to 11 other objects
|
||||||
|
\set VERBOSITY default
|
||||||
|
SET citus.shard_replication_factor to 2;
|
|
@ -0,0 +1,6 @@
|
||||||
|
SHOW server_version \gset
|
||||||
|
SELECT substring(:'server_version', '\d+')::int > 11 AS server_version_above_eleven
|
||||||
|
\gset
|
||||||
|
\if :server_version_above_eleven
|
||||||
|
\else
|
||||||
|
\q
|
|
@ -56,7 +56,7 @@ test: multi_partitioning_utils multi_partitioning replicated_partitioned_table
|
||||||
test: subquery_basics subquery_local_tables subquery_executors subquery_and_cte set_operations set_operation_and_local_tables
|
test: subquery_basics subquery_local_tables subquery_executors subquery_and_cte set_operations set_operation_and_local_tables
|
||||||
test: subqueries_deep subquery_view subquery_partitioning subquery_complex_target_list subqueries_not_supported subquery_in_where
|
test: subqueries_deep subquery_view subquery_partitioning subquery_complex_target_list subqueries_not_supported subquery_in_where
|
||||||
test: non_colocated_leaf_subquery_joins non_colocated_subquery_joins non_colocated_join_order
|
test: non_colocated_leaf_subquery_joins non_colocated_subquery_joins non_colocated_join_order
|
||||||
test: subquery_prepared_statements
|
test: subquery_prepared_statements pg12
|
||||||
|
|
||||||
# ----------
|
# ----------
|
||||||
# Miscellaneous tests to check our query planning behavior
|
# Miscellaneous tests to check our query planning behavior
|
||||||
|
@ -276,4 +276,3 @@ test: multi_task_string_size
|
||||||
# connection encryption tests
|
# connection encryption tests
|
||||||
# ---------
|
# ---------
|
||||||
test: ssl_by_default
|
test: ssl_by_default
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,262 @@
|
||||||
|
SHOW server_version \gset
|
||||||
|
SELECT substring(:'server_version', '\d+')::int > 11 AS server_version_above_eleven
|
||||||
|
\gset
|
||||||
|
\if :server_version_above_eleven
|
||||||
|
\else
|
||||||
|
\q
|
||||||
|
\endif
|
||||||
|
|
||||||
|
SET citus.shard_replication_factor to 1;
|
||||||
|
SET citus.next_shard_id TO 60000;
|
||||||
|
SET citus.next_placement_id TO 60000;
|
||||||
|
|
||||||
|
create schema test_pg12;
|
||||||
|
set search_path to test_pg12;
|
||||||
|
|
||||||
|
CREATE FUNCTION blackhole_am_handler(internal)
|
||||||
|
RETURNS table_am_handler
|
||||||
|
AS 'citus'
|
||||||
|
LANGUAGE C;
|
||||||
|
CREATE ACCESS METHOD blackhole_am TYPE TABLE HANDLER blackhole_am_handler;
|
||||||
|
|
||||||
|
create table test_am(id int, val int) using blackhole_am;
|
||||||
|
insert into test_am values (1, 1);
|
||||||
|
-- Custom table access methods should be rejected
|
||||||
|
select create_distributed_table('test_am','id');
|
||||||
|
|
||||||
|
-- Test generated columns
|
||||||
|
create table gen1 (
|
||||||
|
id int,
|
||||||
|
val1 int,
|
||||||
|
val2 int GENERATED ALWAYS AS (val1 + 2) STORED
|
||||||
|
);
|
||||||
|
create table gen2 (
|
||||||
|
id int,
|
||||||
|
val1 int,
|
||||||
|
val2 int GENERATED ALWAYS AS (val1 + 2) STORED
|
||||||
|
);
|
||||||
|
|
||||||
|
insert into gen1 (id, val1) values (1,4),(3,6),(5,2),(7,2);
|
||||||
|
insert into gen2 (id, val1) values (1,4),(3,6),(5,2),(7,2);
|
||||||
|
|
||||||
|
select * from create_distributed_table('gen1', 'id');
|
||||||
|
select * from create_distributed_table('gen2', 'val2');
|
||||||
|
|
||||||
|
insert into gen1 (id, val1) values (2,4),(4,6),(6,2),(8,2);
|
||||||
|
insert into gen2 (id, val1) values (2,4),(4,6),(6,2),(8,2);
|
||||||
|
|
||||||
|
select * from gen1;
|
||||||
|
select * from gen2;
|
||||||
|
|
||||||
|
-- Test new VACUUM/ANALYZE options
|
||||||
|
analyze (skip_locked) gen1;
|
||||||
|
vacuum (skip_locked) gen1;
|
||||||
|
vacuum (truncate 0) gen1;
|
||||||
|
vacuum (index_cleanup 1) gen1;
|
||||||
|
|
||||||
|
-- COPY FROM
|
||||||
|
create table cptest (id int, val int);
|
||||||
|
select create_distributed_table('cptest', 'id');
|
||||||
|
copy cptest from STDIN with csv where val < 4;
|
||||||
|
1,6
|
||||||
|
2,3
|
||||||
|
3,2
|
||||||
|
4,9
|
||||||
|
5,4
|
||||||
|
\.
|
||||||
|
select sum(id), sum(val) from cptest;
|
||||||
|
|
||||||
|
-- CTE materialized/not materialized
|
||||||
|
CREATE TABLE single_hash_repartition_first (id int, sum int, avg float);
|
||||||
|
CREATE TABLE single_hash_repartition_second (id int primary key, sum int, avg float);
|
||||||
|
|
||||||
|
SELECT create_distributed_table('single_hash_repartition_first', 'id');
|
||||||
|
SELECT create_distributed_table('single_hash_repartition_second', 'id');
|
||||||
|
|
||||||
|
INSERT INTO single_hash_repartition_first
|
||||||
|
SELECT i, i * 3, i * 0.3
|
||||||
|
FROM generate_series(0, 100) i;
|
||||||
|
|
||||||
|
INSERT INTO single_hash_repartition_second
|
||||||
|
SELECT i * 2, i * 5, i * 0.6
|
||||||
|
FROM generate_series(0, 100) i;
|
||||||
|
|
||||||
|
-- a sample router query with NOT MATERIALIZED
|
||||||
|
-- which pushes down the filters to the CTE
|
||||||
|
SELECT public.coordinator_plan($Q$
|
||||||
|
EXPLAIN (COSTS OFF)
|
||||||
|
WITH cte1 AS NOT MATERIALIZED
|
||||||
|
(
|
||||||
|
SELECT id
|
||||||
|
FROM single_hash_repartition_first t1
|
||||||
|
)
|
||||||
|
SELECT count(*)
|
||||||
|
FROM cte1, single_hash_repartition_second
|
||||||
|
WHERE cte1.id = single_hash_repartition_second.id AND single_hash_repartition_second.id = 45;
|
||||||
|
$Q$);
|
||||||
|
|
||||||
|
-- same query, without NOT MATERIALIZED, which is already default
|
||||||
|
-- which pushes down the filters to the CTE
|
||||||
|
SELECT public.coordinator_plan($Q$
|
||||||
|
EXPLAIN (COSTS OFF)
|
||||||
|
WITH cte1 AS
|
||||||
|
(
|
||||||
|
SELECT id
|
||||||
|
FROM single_hash_repartition_first t1
|
||||||
|
)
|
||||||
|
SELECT count(*)
|
||||||
|
FROM cte1, single_hash_repartition_second
|
||||||
|
WHERE cte1.id = single_hash_repartition_second.id AND single_hash_repartition_second.id = 45;
|
||||||
|
$Q$);
|
||||||
|
|
||||||
|
-- same query with MATERIALIZED
|
||||||
|
-- which prevents pushing down filters to the CTE,
|
||||||
|
-- thus becoming a real-time query
|
||||||
|
SELECT public.coordinator_plan($Q$
|
||||||
|
EXPLAIN (COSTS OFF)
|
||||||
|
WITH cte1 AS MATERIALIZED
|
||||||
|
(
|
||||||
|
SELECT id
|
||||||
|
FROM single_hash_repartition_first t1
|
||||||
|
)
|
||||||
|
SELECT count(*)
|
||||||
|
FROM cte1, single_hash_repartition_second
|
||||||
|
WHERE cte1.id = single_hash_repartition_second.id AND single_hash_repartition_second.id = 45;
|
||||||
|
$Q$);
|
||||||
|
|
||||||
|
-- similar query with MATERIALIZED
|
||||||
|
-- now manually have the same filter in the CTE
|
||||||
|
-- thus becoming a router query again
|
||||||
|
SELECT public.coordinator_plan($Q$
|
||||||
|
EXPLAIN (COSTS OFF)
|
||||||
|
WITH cte1 AS MATERIALIZED
|
||||||
|
(
|
||||||
|
SELECT id
|
||||||
|
FROM single_hash_repartition_first t1
|
||||||
|
WHERE id = 45
|
||||||
|
)
|
||||||
|
SELECT count(*)
|
||||||
|
FROM cte1, single_hash_repartition_second
|
||||||
|
WHERE cte1.id = single_hash_repartition_second.id AND single_hash_repartition_second.id = 45;
|
||||||
|
$Q$);
|
||||||
|
|
||||||
|
-- now, have a real-time query without MATERIALIZED
|
||||||
|
-- these are sanitiy checks, because all of the CTEs are recursively
|
||||||
|
-- planned and there is no benefit that Citus can have
|
||||||
|
SELECT public.coordinator_plan($Q$
|
||||||
|
EXPLAIN (COSTS OFF)
|
||||||
|
WITH cte1 AS MATERIALIZED
|
||||||
|
(
|
||||||
|
SELECT id
|
||||||
|
FROM single_hash_repartition_first t1
|
||||||
|
WHERE sum = 45
|
||||||
|
)
|
||||||
|
SELECT count(*)
|
||||||
|
FROM cte1, single_hash_repartition_second
|
||||||
|
WHERE cte1.id = single_hash_repartition_second.id AND single_hash_repartition_second.sum = 45;
|
||||||
|
$Q$);
|
||||||
|
|
||||||
|
SELECT public.coordinator_plan($Q$
|
||||||
|
EXPLAIN (COSTS OFF)
|
||||||
|
WITH cte1 AS NOT MATERIALIZED
|
||||||
|
(
|
||||||
|
SELECT id
|
||||||
|
FROM single_hash_repartition_first t1
|
||||||
|
WHERE sum = 45
|
||||||
|
)
|
||||||
|
SELECT count(*)
|
||||||
|
FROM cte1, single_hash_repartition_second
|
||||||
|
WHERE cte1.id = single_hash_repartition_second.id AND single_hash_repartition_second.sum = 45;
|
||||||
|
$Q$);
|
||||||
|
|
||||||
|
-- Foreign keys to partition tables
|
||||||
|
CREATE TABLE collections_list (
|
||||||
|
key bigint,
|
||||||
|
collection_id integer,
|
||||||
|
value numeric,
|
||||||
|
PRIMARY KEY(key, collection_id)
|
||||||
|
) PARTITION BY LIST (collection_id);
|
||||||
|
|
||||||
|
CREATE TABLE collections_list_0
|
||||||
|
PARTITION OF collections_list (key, collection_id, value)
|
||||||
|
FOR VALUES IN ( 0 );
|
||||||
|
CREATE TABLE collections_list_1
|
||||||
|
PARTITION OF collections_list (key, collection_id, value)
|
||||||
|
FOR VALUES IN ( 1 );
|
||||||
|
|
||||||
|
CREATE TABLE collection_users
|
||||||
|
(used_id integer, collection_id integer, key bigint);
|
||||||
|
|
||||||
|
ALTER TABLE collection_users
|
||||||
|
ADD CONSTRAINT collection_users_fkey FOREIGN KEY (key, collection_id) REFERENCES collections_list (key, collection_id);
|
||||||
|
|
||||||
|
-- sanity check for postgres
|
||||||
|
INSERT INTO collections_list VALUES (1, 0, '1.1');
|
||||||
|
INSERT INTO collection_users VALUES (1, 0, 1);
|
||||||
|
|
||||||
|
-- should fail because of fkey
|
||||||
|
INSERT INTO collection_users VALUES (1, 1000, 1);
|
||||||
|
|
||||||
|
SELECT create_distributed_table('collections_list', 'key');
|
||||||
|
SELECT create_distributed_table('collection_users', 'key');
|
||||||
|
|
||||||
|
-- should still fail because of fkey
|
||||||
|
INSERT INTO collection_users VALUES (1, 1000, 1);
|
||||||
|
|
||||||
|
-- whereas new record with partition should go through
|
||||||
|
INSERT INTO collections_list VALUES (2, 1, '1.2');
|
||||||
|
INSERT INTO collection_users VALUES (5, 1, 2);
|
||||||
|
|
||||||
|
-- AND CHAIN
|
||||||
|
CREATE TABLE test (x int, y int);
|
||||||
|
INSERT INTO test (x,y) SELECT i,i*3 from generate_series(1, 100) i;
|
||||||
|
|
||||||
|
SELECT create_distributed_table('test', 'x');
|
||||||
|
|
||||||
|
-- single shard queries with CHAIN
|
||||||
|
BEGIN;
|
||||||
|
UPDATE test SET y = 15 WHERE x = 1;
|
||||||
|
COMMIT AND CHAIN;
|
||||||
|
SELECT * FROM test WHERE x = 1;
|
||||||
|
COMMIT;
|
||||||
|
|
||||||
|
BEGIN;
|
||||||
|
UPDATE test SET y = 20 WHERE x = 1;
|
||||||
|
ROLLBACK AND CHAIN;
|
||||||
|
SELECT * FROM test WHERE x = 1;
|
||||||
|
COMMIT;
|
||||||
|
|
||||||
|
-- multi shard queries with CHAIN
|
||||||
|
BEGIN;
|
||||||
|
UPDATE test SET y = 25;
|
||||||
|
COMMIT AND CHAIN;
|
||||||
|
SELECT DISTINCT y FROM test;
|
||||||
|
COMMIT;
|
||||||
|
|
||||||
|
BEGIN;
|
||||||
|
UPDATE test SET y = 30;
|
||||||
|
ROLLBACK AND CHAIN;
|
||||||
|
SELECT DISTINCT y FROM test;
|
||||||
|
COMMIT;
|
||||||
|
|
||||||
|
-- does read only carry over?
|
||||||
|
|
||||||
|
BEGIN READ ONLY;
|
||||||
|
COMMIT AND CHAIN;
|
||||||
|
UPDATE test SET y = 35;
|
||||||
|
COMMIT;
|
||||||
|
SELECT DISTINCT y FROM test;
|
||||||
|
|
||||||
|
BEGIN READ ONLY;
|
||||||
|
ROLLBACK AND CHAIN;
|
||||||
|
UPDATE test SET y = 40;
|
||||||
|
COMMIT;
|
||||||
|
SELECT DISTINCT y FROM test;
|
||||||
|
|
||||||
|
|
||||||
|
\set VERBOSITY terse
|
||||||
|
drop schema test_pg12 cascade;
|
||||||
|
\set VERBOSITY default
|
||||||
|
|
||||||
|
SET citus.shard_replication_factor to 2;
|
||||||
|
|
Loading…
Reference in New Issue