From 693d4695d7295888adba59997e312b1b3a8eb0ef Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Wed, 14 Aug 2019 07:14:01 +0000 Subject: [PATCH] Create a test 'pg12' for pg12 features & error on unsupported new features Unsupported new features: COPY FROM WHERE, GENERATED ALWAYS AS, non-heap table access methods --- .../commands/create_distributed_table.c | 62 +++ src/backend/distributed/commands/multi_copy.c | 10 +- .../distributed/commands/utility_hook.c | 2 +- src/backend/distributed/test/blackhole_am.c | 502 ++++++++++++++++++ src/test/regress/expected/pg12.out | 368 +++++++++++++ src/test/regress/expected/pg12_1.out | 6 + src/test/regress/multi_schedule | 3 +- src/test/regress/sql/pg12.sql | 262 +++++++++ 8 files changed, 1211 insertions(+), 4 deletions(-) create mode 100644 src/backend/distributed/test/blackhole_am.c create mode 100644 src/test/regress/expected/pg12.out create mode 100644 src/test/regress/expected/pg12_1.out create mode 100644 src/test/regress/sql/pg12.sql diff --git a/src/backend/distributed/commands/create_distributed_table.c b/src/backend/distributed/commands/create_distributed_table.c index b1bb140aa..d144c2ada 100644 --- a/src/backend/distributed/commands/create_distributed_table.c +++ b/src/backend/distributed/commands/create_distributed_table.c @@ -21,12 +21,16 @@ #include "catalog/dependency.h" #include "catalog/index.h" #include "catalog/pg_am.h" +#include "catalog/pg_attribute.h" #if (PG_VERSION_NUM < 110000) #include "catalog/pg_constraint_fn.h" #endif #include "catalog/pg_enum.h" #include "catalog/pg_extension.h" #include "catalog/pg_opclass.h" +#if PG_VERSION_NUM >= 12000 +#include "catalog/pg_proc.h" +#endif #include "catalog/pg_trigger.h" #include "commands/defrem.h" #include "commands/extension.h" @@ -99,6 +103,8 @@ static bool LocalTableEmpty(Oid tableId); static void CopyLocalDataIntoShards(Oid relationId); static List * TupleDescColumnNameList(TupleDesc tupleDescriptor); static bool RelationUsesIdentityColumns(TupleDesc relationDesc); +static bool RelationUsesGeneratedStoredColumns(TupleDesc relationDesc); +static bool RelationUsesHeapAccessMethodOrNone(Relation relation); static bool CanUseExclusiveConnections(Oid relationId, bool localTableEmpty); /* exports for SQL callable functions */ @@ -645,6 +651,12 @@ EnsureRelationCanBeDistributed(Oid relationId, Var *distributionColumn, relationDesc = RelationGetDescr(relation); relationName = RelationGetRelationName(relation); + if (!RelationUsesHeapAccessMethodOrNone(relation)) + { + ereport(ERROR, (errmsg( + "cannot distribute relations using non-heap access methods"))); + } + #if PG_VERSION_NUM < 120000 /* verify target relation does not use WITH (OIDS) PostgreSQL feature */ @@ -666,6 +678,15 @@ EnsureRelationCanBeDistributed(Oid relationId, Var *distributionColumn, "... AS IDENTITY."))); } + /* verify target relation does not use generated columns */ + if (RelationUsesGeneratedStoredColumns(relationDesc)) + { + ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot distribute relation: %s", relationName), + errdetail("Distributed relations must not use GENERATED ALWAYS " + "AS (...) STORED."))); + } + /* check for support function needed by specified partition method */ if (distributionMethod == DISTRIBUTE_BY_HASH) { @@ -1374,3 +1395,44 @@ RelationUsesIdentityColumns(TupleDesc relationDesc) return false; } + + +/* + * RelationUsesIdentityColumns returns whether a given relation uses the SQL + * GENERATED ... AS IDENTITY features introduced as of PostgreSQL 10. + */ +static bool +RelationUsesGeneratedStoredColumns(TupleDesc relationDesc) +{ +#if PG_VERSION_NUM >= 120000 + int attributeIndex = 0; + + for (attributeIndex = 0; attributeIndex < relationDesc->natts; attributeIndex++) + { + Form_pg_attribute attributeForm = TupleDescAttr(relationDesc, attributeIndex); + + if (attributeForm->attgenerated == ATTRIBUTE_GENERATED_STORED) + { + return true; + } + } +#endif + + return false; +} + + +/* + * Returns whether given relation uses default access method + */ +static bool +RelationUsesHeapAccessMethodOrNone(Relation relation) +{ +#if PG_VERSION_NUM >= 120000 + + return relation->rd_rel->relkind != RELKIND_RELATION || + relation->rd_amhandler == HEAP_TABLE_AM_HANDLER_OID; +#else + return true; +#endif +} diff --git a/src/backend/distributed/commands/multi_copy.c b/src/backend/distributed/commands/multi_copy.c index 904fab1dc..defc08756 100644 --- a/src/backend/distributed/commands/multi_copy.c +++ b/src/backend/distributed/commands/multi_copy.c @@ -2803,6 +2803,14 @@ ProcessCopyStmt(CopyStmt *copyStatement, char *completionTag, const char *queryS { if (copyStatement->is_from) { +#if PG_VERSION_NUM >= 120000 + if (copyStatement->whereClause) + { + ereport(ERROR, (errmsg( + "Citus does not support COPY FROM with WHERE"))); + } +#endif + /* check permissions, we're bypassing postgres' normal checks */ if (!isCopyFromWorker) { @@ -2812,7 +2820,7 @@ ProcessCopyStmt(CopyStmt *copyStatement, char *completionTag, const char *queryS CitusCopyFrom(copyStatement, completionTag); return NULL; } - else if (!copyStatement->is_from) + else { /* * The copy code only handles SELECTs in COPY ... TO on master tables, diff --git a/src/backend/distributed/commands/utility_hook.c b/src/backend/distributed/commands/utility_hook.c index a8443a2e0..aad67cbfa 100644 --- a/src/backend/distributed/commands/utility_hook.c +++ b/src/backend/distributed/commands/utility_hook.c @@ -194,7 +194,7 @@ multi_ProcessUtility(PlannedStmt *pstmt, /* * TRANSMIT used to be separate command, but to avoid patching the grammar - * it's no overlaid onto COPY, but with FORMAT = 'transmit' instead of the + * it's now overlaid onto COPY, but with FORMAT = 'transmit' instead of the * normal FORMAT options. */ if (IsTransmitStmt(parsetree)) diff --git a/src/backend/distributed/test/blackhole_am.c b/src/backend/distributed/test/blackhole_am.c new file mode 100644 index 000000000..ac0e60ae1 --- /dev/null +++ b/src/backend/distributed/test/blackhole_am.c @@ -0,0 +1,502 @@ +/*------------------------------------------------------------------------- + * + * blackhole_am.c + * blackhole table access method code + * + * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * + * IDENTIFICATION + * Copied from https://github.com/michaelpq/pg_plugins/blob/master/blackhole_am/blackhole_am.c + * + * + * NOTES + * This file introduces the table access method blackhole, which can + * be used as a template for other table access methods, and guarantees + * that any data inserted into it gets sent to the void. + * + *------------------------------------------------------------------------- + */ + +/* *INDENT-OFF* */ +#include "postgres.h" + +#if PG_VERSION_NUM >= 120000 + +#include "access/tableam.h" +#include "access/heapam.h" +#include "access/amapi.h" +#include "catalog/index.h" +#include "commands/vacuum.h" +#include "executor/tuptable.h" + +PG_FUNCTION_INFO_V1(blackhole_am_handler); + +/* Base structures for scans */ +typedef struct BlackholeScanDescData +{ + TableScanDescData rs_base; /* AM independent part of the descriptor */ + + /* Add more fields here as needed by the AM. */ +} BlackholeScanDescData; +typedef struct BlackholeScanDescData *BlackholeScanDesc; + +static const TableAmRoutine blackhole_methods; + + +/* ------------------------------------------------------------------------ + * Slot related callbacks for blackhole AM + * ------------------------------------------------------------------------ + */ + +static const TupleTableSlotOps * +blackhole_slot_callbacks(Relation relation) +{ + /* + * Here you would most likely want to invent your own set of + * slot callbacks for your AM. + */ + return &TTSOpsMinimalTuple; +} + +/* ------------------------------------------------------------------------ + * Table Scan Callbacks for blackhole AM + * ------------------------------------------------------------------------ + */ + +static TableScanDesc +blackhole_scan_begin(Relation relation, Snapshot snapshot, + int nkeys, ScanKey key, + ParallelTableScanDesc parallel_scan, + uint32 flags) +{ + BlackholeScanDesc scan; + + scan = (BlackholeScanDesc) palloc(sizeof(BlackholeScanDescData)); + + scan->rs_base.rs_rd = relation; + scan->rs_base.rs_snapshot = snapshot; + scan->rs_base.rs_nkeys = nkeys; + scan->rs_base.rs_flags = flags; + scan->rs_base.rs_parallel = parallel_scan; + + return (TableScanDesc) scan; } + +static void +blackhole_scan_end(TableScanDesc sscan) +{ + BlackholeScanDesc scan = (BlackholeScanDesc) sscan; + pfree(scan); +} + +static void +blackhole_scan_rescan(TableScanDesc sscan, ScanKey key, bool set_params, + bool allow_strat, bool allow_sync, bool allow_pagemode) +{ + /* nothing to do */ +} + +static bool +blackhole_scan_getnextslot(TableScanDesc sscan, ScanDirection direction, + TupleTableSlot *slot) +{ + /* nothing to do */ + return false; +} + +/* ------------------------------------------------------------------------ + * Index Scan Callbacks for blackhole AM + * ------------------------------------------------------------------------ + */ + +static IndexFetchTableData * +blackhole_index_fetch_begin(Relation rel) +{ + return NULL; +} + +static void +blackhole_index_fetch_reset(IndexFetchTableData *scan) +{ + /* nothing to do here */ +} + +static void +blackhole_index_fetch_end(IndexFetchTableData *scan) +{ + /* nothing to do here */ +} + +static bool +blackhole_index_fetch_tuple(struct IndexFetchTableData *scan, + ItemPointer tid, + Snapshot snapshot, + TupleTableSlot *slot, + bool *call_again, bool *all_dead) +{ + /* there is no data */ + return 0; +} + + +/* ------------------------------------------------------------------------ + * Callbacks for non-modifying operations on individual tuples for + * blackhole AM. + * ------------------------------------------------------------------------ + */ + +static bool +blackhole_fetch_row_version(Relation relation, + ItemPointer tid, + Snapshot snapshot, + TupleTableSlot *slot) +{ + /* nothing to do */ + return false; +} + +static void +blackhole_get_latest_tid(TableScanDesc sscan, + ItemPointer tid) +{ + /* nothing to do */ +} + +static bool +blackhole_tuple_tid_valid(TableScanDesc scan, ItemPointer tid) +{ + return false; +} + +static bool +blackhole_tuple_satisfies_snapshot(Relation rel, TupleTableSlot *slot, + Snapshot snapshot) +{ + return false; +} + +static TransactionId +blackhole_compute_xid_horizon_for_tuples(Relation rel, + ItemPointerData *tids, + int nitems) +{ + return InvalidTransactionId; +} + +/* ---------------------------------------------------------------------------- + * Functions for manipulations of physical tuples for blackhole AM. + * ---------------------------------------------------------------------------- + */ + +static void +blackhole_tuple_insert(Relation relation, TupleTableSlot *slot, + CommandId cid, int options, BulkInsertState bistate) +{ + /* nothing to do */ +} + +static void +blackhole_tuple_insert_speculative(Relation relation, TupleTableSlot *slot, + CommandId cid, int options, + BulkInsertState bistate, + uint32 specToken) +{ + /* nothing to do */ +} + +static void +blackhole_tuple_complete_speculative(Relation relation, TupleTableSlot *slot, + uint32 spekToken, bool succeeded) +{ + /* nothing to do */ +} + +static void +blackhole_multi_insert(Relation relation, TupleTableSlot **slots, + int ntuples, CommandId cid, int options, + BulkInsertState bistate) +{ + /* nothing to do */ +} + +static TM_Result +blackhole_tuple_delete(Relation relation, ItemPointer tid, CommandId cid, + Snapshot snapshot, Snapshot crosscheck, bool wait, + TM_FailureData *tmfd, bool changingPart) +{ + /* nothing to do, so it is always OK */ + return TM_Ok; +} + + +static TM_Result +blackhole_tuple_update(Relation relation, ItemPointer otid, + TupleTableSlot *slot, CommandId cid, + Snapshot snapshot, Snapshot crosscheck, + bool wait, TM_FailureData *tmfd, + LockTupleMode *lockmode, bool *update_indexes) +{ + /* nothing to do, so it is always OK */ + return TM_Ok; +} + +static TM_Result +blackhole_tuple_lock(Relation relation, ItemPointer tid, Snapshot snapshot, + TupleTableSlot *slot, CommandId cid, LockTupleMode mode, + LockWaitPolicy wait_policy, uint8 flags, + TM_FailureData *tmfd) +{ + /* nothing to do, so it is always OK */ + return TM_Ok; +} + +static void +blackhole_finish_bulk_insert(Relation relation, int options) +{ + /* nothing to do */ +} + + +/* ------------------------------------------------------------------------ + * DDL related callbacks for blackhole AM. + * ------------------------------------------------------------------------ + */ + +static void +blackhole_relation_set_new_filenode(Relation rel, + const RelFileNode *newrnode, + char persistence, + TransactionId *freezeXid, + MultiXactId *minmulti) +{ + /* nothing to do */ +} + +static void +blackhole_relation_nontransactional_truncate(Relation rel) +{ + /* nothing to do */ +} + +static void +blackhole_copy_data(Relation rel, const RelFileNode *newrnode) +{ + /* there is no data */ +} + +static void +blackhole_copy_for_cluster(Relation OldTable, Relation NewTable, + Relation OldIndex, bool use_sort, + TransactionId OldestXmin, + TransactionId *xid_cutoff, + MultiXactId *multi_cutoff, + double *num_tuples, + double *tups_vacuumed, + double *tups_recently_dead) +{ + /* no data, so nothing to do */ +} + +static void +blackhole_vacuum(Relation onerel, VacuumParams *params, + BufferAccessStrategy bstrategy) +{ + /* no data, so nothing to do */ +} + +static bool +blackhole_scan_analyze_next_block(TableScanDesc scan, BlockNumber blockno, + BufferAccessStrategy bstrategy) +{ + /* no data, so no point to analyze next block */ + return false; +} + +static bool +blackhole_scan_analyze_next_tuple(TableScanDesc scan, TransactionId OldestXmin, + double *liverows, double *deadrows, + TupleTableSlot *slot) +{ + /* no data, so no point to analyze next tuple */ + return false; +} + +static double +blackhole_index_build_range_scan(Relation tableRelation, + Relation indexRelation, + IndexInfo *indexInfo, + bool allow_sync, + bool anyvisible, + bool progress, + BlockNumber start_blockno, + BlockNumber numblocks, + IndexBuildCallback callback, + void *callback_state, + TableScanDesc scan) +{ + /* no data, so no tuples */ + return 0; +} + +static void +blackhole_index_validate_scan(Relation tableRelation, + Relation indexRelation, + IndexInfo *indexInfo, + Snapshot snapshot, + ValidateIndexState *state) +{ + /* nothing to do */ +} + + +/* ------------------------------------------------------------------------ + * Miscellaneous callbacks for the blackhole AM + * ------------------------------------------------------------------------ + */ + +static uint64 +blackhole_relation_size(Relation rel, ForkNumber forkNumber) +{ + /* there is nothing */ + return 0; +} + +/* + * Check to see whether the table needs a TOAST table. + */ +static bool +blackhole_relation_needs_toast_table(Relation rel) +{ + /* no data, so no toast table needed */ + return false; +} + + +/* ------------------------------------------------------------------------ + * Planner related callbacks for the blackhole AM + * ------------------------------------------------------------------------ + */ + +static void +blackhole_estimate_rel_size(Relation rel, int32 *attr_widths, + BlockNumber *pages, double *tuples, + double *allvisfrac) +{ + /* no data available */ + *attr_widths = 0; + *tuples = 0; + *allvisfrac = 0; + *pages = 0; +} + + +/* ------------------------------------------------------------------------ + * Executor related callbacks for the blackhole AM + * ------------------------------------------------------------------------ + */ + +static bool +blackhole_scan_bitmap_next_block(TableScanDesc scan, + TBMIterateResult *tbmres) +{ + /* no data, so no point to scan next block */ + return false; +} + +static bool +blackhole_scan_bitmap_next_tuple(TableScanDesc scan, + TBMIterateResult *tbmres, + TupleTableSlot *slot) +{ + /* no data, so no point to scan next tuple */ + return false; +} + +static bool +blackhole_scan_sample_next_block(TableScanDesc scan, + SampleScanState *scanstate) +{ + /* no data, so no point to scan next block for sampling */ + return false; +} + +static bool +blackhole_scan_sample_next_tuple(TableScanDesc scan, + SampleScanState *scanstate, + TupleTableSlot *slot) +{ + /* no data, so no point to scan next tuple for sampling */ + return false; +} + + +/* ------------------------------------------------------------------------ + * Definition of the blackhole table access method. + * ------------------------------------------------------------------------ + */ + +static const TableAmRoutine blackhole_methods = { + .type = T_TableAmRoutine, + + .slot_callbacks = blackhole_slot_callbacks, + + .scan_begin = blackhole_scan_begin, + .scan_end = blackhole_scan_end, + .scan_rescan = blackhole_scan_rescan, + .scan_getnextslot = blackhole_scan_getnextslot, + + /* these are common helper functions */ + .parallelscan_estimate = table_block_parallelscan_estimate, + .parallelscan_initialize = table_block_parallelscan_initialize, + .parallelscan_reinitialize = table_block_parallelscan_reinitialize, + + .index_fetch_begin = blackhole_index_fetch_begin, + .index_fetch_reset = blackhole_index_fetch_reset, + .index_fetch_end = blackhole_index_fetch_end, + .index_fetch_tuple = blackhole_index_fetch_tuple, + + .tuple_insert = blackhole_tuple_insert, + .tuple_insert_speculative = blackhole_tuple_insert_speculative, + .tuple_complete_speculative = blackhole_tuple_complete_speculative, + .multi_insert = blackhole_multi_insert, + .tuple_delete = blackhole_tuple_delete, + .tuple_update = blackhole_tuple_update, + .tuple_lock = blackhole_tuple_lock, + .finish_bulk_insert = blackhole_finish_bulk_insert, + + .tuple_fetch_row_version = blackhole_fetch_row_version, + .tuple_get_latest_tid = blackhole_get_latest_tid, + .tuple_tid_valid = blackhole_tuple_tid_valid, + .tuple_satisfies_snapshot = blackhole_tuple_satisfies_snapshot, + .compute_xid_horizon_for_tuples = blackhole_compute_xid_horizon_for_tuples, + + .relation_set_new_filenode = blackhole_relation_set_new_filenode, + .relation_nontransactional_truncate = blackhole_relation_nontransactional_truncate, + .relation_copy_data = blackhole_copy_data, + .relation_copy_for_cluster = blackhole_copy_for_cluster, + .relation_vacuum = blackhole_vacuum, + .scan_analyze_next_block = blackhole_scan_analyze_next_block, + .scan_analyze_next_tuple = blackhole_scan_analyze_next_tuple, + .index_build_range_scan = blackhole_index_build_range_scan, + .index_validate_scan = blackhole_index_validate_scan, + + .relation_size = blackhole_relation_size, + .relation_needs_toast_table = blackhole_relation_needs_toast_table, + + .relation_estimate_size = blackhole_estimate_rel_size, + + .scan_bitmap_next_block = blackhole_scan_bitmap_next_block, + .scan_bitmap_next_tuple = blackhole_scan_bitmap_next_tuple, + .scan_sample_next_block = blackhole_scan_sample_next_block, + .scan_sample_next_tuple = blackhole_scan_sample_next_tuple +}; + + +Datum +blackhole_am_handler(PG_FUNCTION_ARGS) +{ + PG_RETURN_POINTER(&blackhole_methods); +} + +#endif diff --git a/src/test/regress/expected/pg12.out b/src/test/regress/expected/pg12.out new file mode 100644 index 000000000..be554a521 --- /dev/null +++ b/src/test/regress/expected/pg12.out @@ -0,0 +1,368 @@ +SHOW server_version \gset +SELECT substring(:'server_version', '\d+')::int > 11 AS server_version_above_eleven +\gset +\if :server_version_above_eleven +\else +\q +\endif +SET citus.shard_replication_factor to 1; +SET citus.next_shard_id TO 60000; +SET citus.next_placement_id TO 60000; +create schema test_pg12; +set search_path to test_pg12; +CREATE FUNCTION blackhole_am_handler(internal) +RETURNS table_am_handler +AS 'citus' +LANGUAGE C; +CREATE ACCESS METHOD blackhole_am TYPE TABLE HANDLER blackhole_am_handler; +create table test_am(id int, val int) using blackhole_am; +insert into test_am values (1, 1); +-- Custom table access methods should be rejected +select create_distributed_table('test_am','id'); +ERROR: cannot distribute relations using non-heap access methods +-- Test generated columns +create table gen1 ( + id int, + val1 int, + val2 int GENERATED ALWAYS AS (val1 + 2) STORED +); +create table gen2 ( + id int, + val1 int, + val2 int GENERATED ALWAYS AS (val1 + 2) STORED +); +insert into gen1 (id, val1) values (1,4),(3,6),(5,2),(7,2); +insert into gen2 (id, val1) values (1,4),(3,6),(5,2),(7,2); +select * from create_distributed_table('gen1', 'id'); +ERROR: cannot distribute relation: gen1 +DETAIL: Distributed relations must not use GENERATED ALWAYS AS (...) STORED. +select * from create_distributed_table('gen2', 'val2'); +ERROR: cannot distribute relation: gen2 +DETAIL: Distributed relations must not use GENERATED ALWAYS AS (...) STORED. +insert into gen1 (id, val1) values (2,4),(4,6),(6,2),(8,2); +insert into gen2 (id, val1) values (2,4),(4,6),(6,2),(8,2); +select * from gen1; + id | val1 | val2 +----+------+------ + 1 | 4 | 6 + 3 | 6 | 8 + 5 | 2 | 4 + 7 | 2 | 4 + 2 | 4 | 6 + 4 | 6 | 8 + 6 | 2 | 4 + 8 | 2 | 4 +(8 rows) + +select * from gen2; + id | val1 | val2 +----+------+------ + 1 | 4 | 6 + 3 | 6 | 8 + 5 | 2 | 4 + 7 | 2 | 4 + 2 | 4 | 6 + 4 | 6 | 8 + 6 | 2 | 4 + 8 | 2 | 4 +(8 rows) + +-- Test new VACUUM/ANALYZE options +analyze (skip_locked) gen1; +vacuum (skip_locked) gen1; +vacuum (truncate 0) gen1; +vacuum (index_cleanup 1) gen1; +-- COPY FROM +create table cptest (id int, val int); +select create_distributed_table('cptest', 'id'); + create_distributed_table +-------------------------- + +(1 row) + +copy cptest from STDIN with csv where val < 4; +ERROR: Citus does not support COPY FROM with WHERE +1,6 +2,3 +3,2 +4,9 +5,4 +\. +invalid command \. +select sum(id), sum(val) from cptest; +ERROR: syntax error at or near "1" +LINE 1: 1,6 + ^ +-- CTE materialized/not materialized +CREATE TABLE single_hash_repartition_first (id int, sum int, avg float); +CREATE TABLE single_hash_repartition_second (id int primary key, sum int, avg float); +SELECT create_distributed_table('single_hash_repartition_first', 'id'); + create_distributed_table +-------------------------- + +(1 row) + +SELECT create_distributed_table('single_hash_repartition_second', 'id'); + create_distributed_table +-------------------------- + +(1 row) + +INSERT INTO single_hash_repartition_first +SELECT i, i * 3, i * 0.3 +FROM generate_series(0, 100) i; +INSERT INTO single_hash_repartition_second +SELECT i * 2, i * 5, i * 0.6 +FROM generate_series(0, 100) i; +-- a sample router query with NOT MATERIALIZED +-- which pushes down the filters to the CTE +SELECT public.coordinator_plan($Q$ +EXPLAIN (COSTS OFF) +WITH cte1 AS NOT MATERIALIZED +( + SELECT id + FROM single_hash_repartition_first t1 +) +SELECT count(*) +FROM cte1, single_hash_repartition_second +WHERE cte1.id = single_hash_repartition_second.id AND single_hash_repartition_second.id = 45; +$Q$); + coordinator_plan +------------------------------ + Custom Scan (Citus Adaptive) + Task Count: 1 +(2 rows) + +-- same query, without NOT MATERIALIZED, which is already default +-- which pushes down the filters to the CTE +SELECT public.coordinator_plan($Q$ +EXPLAIN (COSTS OFF) +WITH cte1 AS +( + SELECT id + FROM single_hash_repartition_first t1 +) +SELECT count(*) +FROM cte1, single_hash_repartition_second +WHERE cte1.id = single_hash_repartition_second.id AND single_hash_repartition_second.id = 45; +$Q$); + coordinator_plan +------------------------------ + Custom Scan (Citus Adaptive) + Task Count: 1 +(2 rows) + +-- same query with MATERIALIZED +-- which prevents pushing down filters to the CTE, +-- thus becoming a real-time query +SELECT public.coordinator_plan($Q$ +EXPLAIN (COSTS OFF) +WITH cte1 AS MATERIALIZED +( + SELECT id + FROM single_hash_repartition_first t1 +) +SELECT count(*) +FROM cte1, single_hash_repartition_second +WHERE cte1.id = single_hash_repartition_second.id AND single_hash_repartition_second.id = 45; +$Q$); + coordinator_plan +------------------------------------------ + Custom Scan (Citus Adaptive) + -> Distributed Subplan 5_1 + -> Custom Scan (Citus Adaptive) + Task Count: 4 +(4 rows) + +-- similar query with MATERIALIZED +-- now manually have the same filter in the CTE +-- thus becoming a router query again +SELECT public.coordinator_plan($Q$ +EXPLAIN (COSTS OFF) +WITH cte1 AS MATERIALIZED +( + SELECT id + FROM single_hash_repartition_first t1 + WHERE id = 45 +) +SELECT count(*) +FROM cte1, single_hash_repartition_second +WHERE cte1.id = single_hash_repartition_second.id AND single_hash_repartition_second.id = 45; +$Q$); + coordinator_plan +------------------------------ + Custom Scan (Citus Adaptive) + Task Count: 1 +(2 rows) + +-- now, have a real-time query without MATERIALIZED +-- these are sanitiy checks, because all of the CTEs are recursively +-- planned and there is no benefit that Citus can have +SELECT public.coordinator_plan($Q$ +EXPLAIN (COSTS OFF) +WITH cte1 AS MATERIALIZED +( + SELECT id + FROM single_hash_repartition_first t1 + WHERE sum = 45 +) +SELECT count(*) +FROM cte1, single_hash_repartition_second +WHERE cte1.id = single_hash_repartition_second.id AND single_hash_repartition_second.sum = 45; +$Q$); + coordinator_plan +------------------------------------------------ + Aggregate + -> Custom Scan (Citus Adaptive) + -> Distributed Subplan 8_1 + -> Custom Scan (Citus Adaptive) + Task Count: 4 +(5 rows) + +SELECT public.coordinator_plan($Q$ +EXPLAIN (COSTS OFF) +WITH cte1 AS NOT MATERIALIZED +( + SELECT id + FROM single_hash_repartition_first t1 + WHERE sum = 45 +) +SELECT count(*) +FROM cte1, single_hash_repartition_second +WHERE cte1.id = single_hash_repartition_second.id AND single_hash_repartition_second.sum = 45; +$Q$); + coordinator_plan +------------------------------------------------ + Aggregate + -> Custom Scan (Citus Adaptive) + -> Distributed Subplan 10_1 + -> Custom Scan (Citus Adaptive) + Task Count: 4 +(5 rows) + +-- Foreign keys to partition tables +CREATE TABLE collections_list ( + key bigint, + collection_id integer, + value numeric, + PRIMARY KEY(key, collection_id) +) PARTITION BY LIST (collection_id); +CREATE TABLE collections_list_0 + PARTITION OF collections_list (key, collection_id, value) + FOR VALUES IN ( 0 ); +CREATE TABLE collections_list_1 + PARTITION OF collections_list (key, collection_id, value) + FOR VALUES IN ( 1 ); +CREATE TABLE collection_users + (used_id integer, collection_id integer, key bigint); +ALTER TABLE collection_users + ADD CONSTRAINT collection_users_fkey FOREIGN KEY (key, collection_id) REFERENCES collections_list (key, collection_id); +-- sanity check for postgres +INSERT INTO collections_list VALUES (1, 0, '1.1'); +INSERT INTO collection_users VALUES (1, 0, 1); +-- should fail because of fkey +INSERT INTO collection_users VALUES (1, 1000, 1); +ERROR: insert or update on table "collection_users" violates foreign key constraint "collection_users_fkey" +DETAIL: Key (key, collection_id)=(1, 1000) is not present in table "collections_list". +SELECT create_distributed_table('collections_list', 'key'); +NOTICE: Copying data from local table... + create_distributed_table +-------------------------- + +(1 row) + +SELECT create_distributed_table('collection_users', 'key'); +NOTICE: Copying data from local table... + create_distributed_table +-------------------------- + +(1 row) + +-- should still fail because of fkey +INSERT INTO collection_users VALUES (1, 1000, 1); +ERROR: insert or update on table "collection_users_60024" violates foreign key constraint "collection_users_fkey_60024" +DETAIL: Key (key, collection_id)=(1, 1000) is not present in table "collections_list_60012". +CONTEXT: while executing command on localhost:57637 +-- whereas new record with partition should go through +INSERT INTO collections_list VALUES (2, 1, '1.2'); +INSERT INTO collection_users VALUES (5, 1, 2); +-- AND CHAIN +CREATE TABLE test (x int, y int); +INSERT INTO test (x,y) SELECT i,i*3 from generate_series(1, 100) i; +SELECT create_distributed_table('test', 'x'); +NOTICE: Copying data from local table... + create_distributed_table +-------------------------- + +(1 row) + +-- single shard queries with CHAIN +BEGIN; +UPDATE test SET y = 15 WHERE x = 1; +COMMIT AND CHAIN; +SELECT * FROM test WHERE x = 1; + x | y +---+---- + 1 | 15 +(1 row) + +COMMIT; +BEGIN; +UPDATE test SET y = 20 WHERE x = 1; +ROLLBACK AND CHAIN; +SELECT * FROM test WHERE x = 1; + x | y +---+---- + 1 | 15 +(1 row) + +COMMIT; +-- multi shard queries with CHAIN +BEGIN; +UPDATE test SET y = 25; +COMMIT AND CHAIN; +SELECT DISTINCT y FROM test; + y +---- + 25 +(1 row) + +COMMIT; +BEGIN; +UPDATE test SET y = 30; +ROLLBACK AND CHAIN; +SELECT DISTINCT y FROM test; + y +---- + 25 +(1 row) + +COMMIT; +-- does read only carry over? +BEGIN READ ONLY; +COMMIT AND CHAIN; +UPDATE test SET y = 35; +ERROR: cannot execute UPDATE in a read-only transaction +COMMIT; +SELECT DISTINCT y FROM test; + y +---- + 25 +(1 row) + +BEGIN READ ONLY; +ROLLBACK AND CHAIN; +UPDATE test SET y = 40; +ERROR: cannot execute UPDATE in a read-only transaction +COMMIT; +SELECT DISTINCT y FROM test; + y +---- + 25 +(1 row) + +\set VERBOSITY terse +drop schema test_pg12 cascade; +NOTICE: drop cascades to 11 other objects +\set VERBOSITY default +SET citus.shard_replication_factor to 2; diff --git a/src/test/regress/expected/pg12_1.out b/src/test/regress/expected/pg12_1.out new file mode 100644 index 000000000..9d92533f9 --- /dev/null +++ b/src/test/regress/expected/pg12_1.out @@ -0,0 +1,6 @@ +SHOW server_version \gset +SELECT substring(:'server_version', '\d+')::int > 11 AS server_version_above_eleven +\gset +\if :server_version_above_eleven +\else +\q diff --git a/src/test/regress/multi_schedule b/src/test/regress/multi_schedule index 68b7c0957..c095f4e3f 100644 --- a/src/test/regress/multi_schedule +++ b/src/test/regress/multi_schedule @@ -56,7 +56,7 @@ test: multi_partitioning_utils multi_partitioning replicated_partitioned_table test: subquery_basics subquery_local_tables subquery_executors subquery_and_cte set_operations set_operation_and_local_tables test: subqueries_deep subquery_view subquery_partitioning subquery_complex_target_list subqueries_not_supported subquery_in_where test: non_colocated_leaf_subquery_joins non_colocated_subquery_joins non_colocated_join_order -test: subquery_prepared_statements +test: subquery_prepared_statements pg12 # ---------- # Miscellaneous tests to check our query planning behavior @@ -276,4 +276,3 @@ test: multi_task_string_size # connection encryption tests # --------- test: ssl_by_default - diff --git a/src/test/regress/sql/pg12.sql b/src/test/regress/sql/pg12.sql new file mode 100644 index 000000000..3054724e0 --- /dev/null +++ b/src/test/regress/sql/pg12.sql @@ -0,0 +1,262 @@ +SHOW server_version \gset +SELECT substring(:'server_version', '\d+')::int > 11 AS server_version_above_eleven +\gset +\if :server_version_above_eleven +\else +\q +\endif + +SET citus.shard_replication_factor to 1; +SET citus.next_shard_id TO 60000; +SET citus.next_placement_id TO 60000; + +create schema test_pg12; +set search_path to test_pg12; + +CREATE FUNCTION blackhole_am_handler(internal) +RETURNS table_am_handler +AS 'citus' +LANGUAGE C; +CREATE ACCESS METHOD blackhole_am TYPE TABLE HANDLER blackhole_am_handler; + +create table test_am(id int, val int) using blackhole_am; +insert into test_am values (1, 1); +-- Custom table access methods should be rejected +select create_distributed_table('test_am','id'); + +-- Test generated columns +create table gen1 ( + id int, + val1 int, + val2 int GENERATED ALWAYS AS (val1 + 2) STORED +); +create table gen2 ( + id int, + val1 int, + val2 int GENERATED ALWAYS AS (val1 + 2) STORED +); + +insert into gen1 (id, val1) values (1,4),(3,6),(5,2),(7,2); +insert into gen2 (id, val1) values (1,4),(3,6),(5,2),(7,2); + +select * from create_distributed_table('gen1', 'id'); +select * from create_distributed_table('gen2', 'val2'); + +insert into gen1 (id, val1) values (2,4),(4,6),(6,2),(8,2); +insert into gen2 (id, val1) values (2,4),(4,6),(6,2),(8,2); + +select * from gen1; +select * from gen2; + +-- Test new VACUUM/ANALYZE options +analyze (skip_locked) gen1; +vacuum (skip_locked) gen1; +vacuum (truncate 0) gen1; +vacuum (index_cleanup 1) gen1; + +-- COPY FROM +create table cptest (id int, val int); +select create_distributed_table('cptest', 'id'); +copy cptest from STDIN with csv where val < 4; +1,6 +2,3 +3,2 +4,9 +5,4 +\. +select sum(id), sum(val) from cptest; + +-- CTE materialized/not materialized +CREATE TABLE single_hash_repartition_first (id int, sum int, avg float); +CREATE TABLE single_hash_repartition_second (id int primary key, sum int, avg float); + +SELECT create_distributed_table('single_hash_repartition_first', 'id'); +SELECT create_distributed_table('single_hash_repartition_second', 'id'); + +INSERT INTO single_hash_repartition_first +SELECT i, i * 3, i * 0.3 +FROM generate_series(0, 100) i; + +INSERT INTO single_hash_repartition_second +SELECT i * 2, i * 5, i * 0.6 +FROM generate_series(0, 100) i; + +-- a sample router query with NOT MATERIALIZED +-- which pushes down the filters to the CTE +SELECT public.coordinator_plan($Q$ +EXPLAIN (COSTS OFF) +WITH cte1 AS NOT MATERIALIZED +( + SELECT id + FROM single_hash_repartition_first t1 +) +SELECT count(*) +FROM cte1, single_hash_repartition_second +WHERE cte1.id = single_hash_repartition_second.id AND single_hash_repartition_second.id = 45; +$Q$); + +-- same query, without NOT MATERIALIZED, which is already default +-- which pushes down the filters to the CTE +SELECT public.coordinator_plan($Q$ +EXPLAIN (COSTS OFF) +WITH cte1 AS +( + SELECT id + FROM single_hash_repartition_first t1 +) +SELECT count(*) +FROM cte1, single_hash_repartition_second +WHERE cte1.id = single_hash_repartition_second.id AND single_hash_repartition_second.id = 45; +$Q$); + +-- same query with MATERIALIZED +-- which prevents pushing down filters to the CTE, +-- thus becoming a real-time query +SELECT public.coordinator_plan($Q$ +EXPLAIN (COSTS OFF) +WITH cte1 AS MATERIALIZED +( + SELECT id + FROM single_hash_repartition_first t1 +) +SELECT count(*) +FROM cte1, single_hash_repartition_second +WHERE cte1.id = single_hash_repartition_second.id AND single_hash_repartition_second.id = 45; +$Q$); + +-- similar query with MATERIALIZED +-- now manually have the same filter in the CTE +-- thus becoming a router query again +SELECT public.coordinator_plan($Q$ +EXPLAIN (COSTS OFF) +WITH cte1 AS MATERIALIZED +( + SELECT id + FROM single_hash_repartition_first t1 + WHERE id = 45 +) +SELECT count(*) +FROM cte1, single_hash_repartition_second +WHERE cte1.id = single_hash_repartition_second.id AND single_hash_repartition_second.id = 45; +$Q$); + +-- now, have a real-time query without MATERIALIZED +-- these are sanitiy checks, because all of the CTEs are recursively +-- planned and there is no benefit that Citus can have +SELECT public.coordinator_plan($Q$ +EXPLAIN (COSTS OFF) +WITH cte1 AS MATERIALIZED +( + SELECT id + FROM single_hash_repartition_first t1 + WHERE sum = 45 +) +SELECT count(*) +FROM cte1, single_hash_repartition_second +WHERE cte1.id = single_hash_repartition_second.id AND single_hash_repartition_second.sum = 45; +$Q$); + +SELECT public.coordinator_plan($Q$ +EXPLAIN (COSTS OFF) +WITH cte1 AS NOT MATERIALIZED +( + SELECT id + FROM single_hash_repartition_first t1 + WHERE sum = 45 +) +SELECT count(*) +FROM cte1, single_hash_repartition_second +WHERE cte1.id = single_hash_repartition_second.id AND single_hash_repartition_second.sum = 45; +$Q$); + +-- Foreign keys to partition tables +CREATE TABLE collections_list ( + key bigint, + collection_id integer, + value numeric, + PRIMARY KEY(key, collection_id) +) PARTITION BY LIST (collection_id); + +CREATE TABLE collections_list_0 + PARTITION OF collections_list (key, collection_id, value) + FOR VALUES IN ( 0 ); +CREATE TABLE collections_list_1 + PARTITION OF collections_list (key, collection_id, value) + FOR VALUES IN ( 1 ); + +CREATE TABLE collection_users + (used_id integer, collection_id integer, key bigint); + +ALTER TABLE collection_users + ADD CONSTRAINT collection_users_fkey FOREIGN KEY (key, collection_id) REFERENCES collections_list (key, collection_id); + +-- sanity check for postgres +INSERT INTO collections_list VALUES (1, 0, '1.1'); +INSERT INTO collection_users VALUES (1, 0, 1); + +-- should fail because of fkey +INSERT INTO collection_users VALUES (1, 1000, 1); + +SELECT create_distributed_table('collections_list', 'key'); +SELECT create_distributed_table('collection_users', 'key'); + +-- should still fail because of fkey +INSERT INTO collection_users VALUES (1, 1000, 1); + +-- whereas new record with partition should go through +INSERT INTO collections_list VALUES (2, 1, '1.2'); +INSERT INTO collection_users VALUES (5, 1, 2); + +-- AND CHAIN +CREATE TABLE test (x int, y int); +INSERT INTO test (x,y) SELECT i,i*3 from generate_series(1, 100) i; + +SELECT create_distributed_table('test', 'x'); + +-- single shard queries with CHAIN +BEGIN; +UPDATE test SET y = 15 WHERE x = 1; +COMMIT AND CHAIN; +SELECT * FROM test WHERE x = 1; +COMMIT; + +BEGIN; +UPDATE test SET y = 20 WHERE x = 1; +ROLLBACK AND CHAIN; +SELECT * FROM test WHERE x = 1; +COMMIT; + +-- multi shard queries with CHAIN +BEGIN; +UPDATE test SET y = 25; +COMMIT AND CHAIN; +SELECT DISTINCT y FROM test; +COMMIT; + +BEGIN; +UPDATE test SET y = 30; +ROLLBACK AND CHAIN; +SELECT DISTINCT y FROM test; +COMMIT; + +-- does read only carry over? + +BEGIN READ ONLY; +COMMIT AND CHAIN; +UPDATE test SET y = 35; +COMMIT; +SELECT DISTINCT y FROM test; + +BEGIN READ ONLY; +ROLLBACK AND CHAIN; +UPDATE test SET y = 40; +COMMIT; +SELECT DISTINCT y FROM test; + + +\set VERBOSITY terse +drop schema test_pg12 cascade; +\set VERBOSITY default + +SET citus.shard_replication_factor to 2; +