mirror of https://github.com/citusdata/citus.git
Prevent planner from choosing parallel scan for columnar tables (#5245)
Previously, for regular table scans, we were setting `RelOptInfo->partial_pathlist`
to `NIL` via `set_rel_pathlist_hook` to discard scan `Path`s that need to use any
parallel workers, this was working nicely.
However, when building indexes, this hook doesn't get called so we were not
able to prevent spawning parallel workers when building an index. For this
reason, 9b4dc2f804
added basic
implementation for `columnar_parallelscan_*` callbacks but also made some
changes to skip using those workers when building the index.
However, now that we are doing stripe reservation in two stages, we call
`heap_inplace_update` at some point to complete stripe reservation.
However, postgres throws an error if we call `heap_inplace_update` during
a parallel operation, even if we don't actually make use of those workers.
For this reason, with this pr, we make sure to not generate scan `Path`s that
need to use any parallel workers by using `get_relation_info_hook`.
This is indeed useful to prevent spawning parallel workers during index builds.
pull/5255/head
parent
5825c44d5f
commit
3340f17c4e
|
@ -31,6 +31,7 @@
|
||||||
#include "optimizer/optimizer.h"
|
#include "optimizer/optimizer.h"
|
||||||
#include "optimizer/pathnode.h"
|
#include "optimizer/pathnode.h"
|
||||||
#include "optimizer/paths.h"
|
#include "optimizer/paths.h"
|
||||||
|
#include "optimizer/plancat.h"
|
||||||
#include "optimizer/restrictinfo.h"
|
#include "optimizer/restrictinfo.h"
|
||||||
#include "utils/builtins.h"
|
#include "utils/builtins.h"
|
||||||
#include "utils/lsyscache.h"
|
#include "utils/lsyscache.h"
|
||||||
|
@ -95,6 +96,8 @@ static void AddColumnarScanPathsRec(PlannerInfo *root, RelOptInfo *rel,
|
||||||
/* hooks and callbacks */
|
/* hooks and callbacks */
|
||||||
static void ColumnarSetRelPathlistHook(PlannerInfo *root, RelOptInfo *rel, Index rti,
|
static void ColumnarSetRelPathlistHook(PlannerInfo *root, RelOptInfo *rel, Index rti,
|
||||||
RangeTblEntry *rte);
|
RangeTblEntry *rte);
|
||||||
|
static void ColumnarGetRelationInfoHook(PlannerInfo *root, Oid relationObjectId,
|
||||||
|
bool inhparent, RelOptInfo *rel);
|
||||||
static Plan * ColumnarScanPath_PlanCustomPath(PlannerInfo *root,
|
static Plan * ColumnarScanPath_PlanCustomPath(PlannerInfo *root,
|
||||||
RelOptInfo *rel,
|
RelOptInfo *rel,
|
||||||
struct CustomPath *best_path,
|
struct CustomPath *best_path,
|
||||||
|
@ -126,6 +129,7 @@ static Bitmapset * ColumnarAttrNeeded(ScanState *ss);
|
||||||
|
|
||||||
/* saved hook value in case of unload */
|
/* saved hook value in case of unload */
|
||||||
static set_rel_pathlist_hook_type PreviousSetRelPathlistHook = NULL;
|
static set_rel_pathlist_hook_type PreviousSetRelPathlistHook = NULL;
|
||||||
|
static get_relation_info_hook_type PreviousGetRelationInfoHook = NULL;
|
||||||
|
|
||||||
static bool EnableColumnarCustomScan = true;
|
static bool EnableColumnarCustomScan = true;
|
||||||
static bool EnableColumnarQualPushdown = true;
|
static bool EnableColumnarQualPushdown = true;
|
||||||
|
@ -180,6 +184,9 @@ columnar_customscan_init()
|
||||||
PreviousSetRelPathlistHook = set_rel_pathlist_hook;
|
PreviousSetRelPathlistHook = set_rel_pathlist_hook;
|
||||||
set_rel_pathlist_hook = ColumnarSetRelPathlistHook;
|
set_rel_pathlist_hook = ColumnarSetRelPathlistHook;
|
||||||
|
|
||||||
|
PreviousGetRelationInfoHook = get_relation_info_hook;
|
||||||
|
get_relation_info_hook = ColumnarGetRelationInfoHook;
|
||||||
|
|
||||||
/* register customscan specific GUC's */
|
/* register customscan specific GUC's */
|
||||||
DefineCustomBoolVariable(
|
DefineCustomBoolVariable(
|
||||||
"columnar.enable_custom_scan",
|
"columnar.enable_custom_scan",
|
||||||
|
@ -274,8 +281,14 @@ ColumnarSetRelPathlistHook(PlannerInfo *root, RelOptInfo *rel, Index rti,
|
||||||
errmsg("sample scans not supported on columnar tables")));
|
errmsg("sample scans not supported on columnar tables")));
|
||||||
}
|
}
|
||||||
|
|
||||||
/* columnar doesn't support parallel paths */
|
if (list_length(rel->partial_pathlist) != 0)
|
||||||
rel->partial_pathlist = NIL;
|
{
|
||||||
|
/*
|
||||||
|
* Parallel scans on columnar tables are already discardad by
|
||||||
|
* ColumnarGetRelationInfoHook but be on the safe side.
|
||||||
|
*/
|
||||||
|
elog(ERROR, "parallel scans on columnar are not supported");
|
||||||
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* There are cases where IndexPath is normally more preferrable over
|
* There are cases where IndexPath is normally more preferrable over
|
||||||
|
@ -318,6 +331,23 @@ ColumnarSetRelPathlistHook(PlannerInfo *root, RelOptInfo *rel, Index rti,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
static void
|
||||||
|
ColumnarGetRelationInfoHook(PlannerInfo *root, Oid relationObjectId,
|
||||||
|
bool inhparent, RelOptInfo *rel)
|
||||||
|
{
|
||||||
|
if (PreviousGetRelationInfoHook)
|
||||||
|
{
|
||||||
|
PreviousGetRelationInfoHook(root, relationObjectId, inhparent, rel);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (IsColumnarTableAmTable(relationObjectId))
|
||||||
|
{
|
||||||
|
/* disable parallel query */
|
||||||
|
rel->rel_parallel_workers = 0;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* RemovePathsByPredicate removes the paths that removePathPredicate
|
* RemovePathsByPredicate removes the paths that removePathPredicate
|
||||||
* evaluates to true from pathlist of given rel.
|
* evaluates to true from pathlist of given rel.
|
||||||
|
|
|
@ -436,33 +436,21 @@ ErrorIfInvalidRowNumber(uint64 rowNumber)
|
||||||
static Size
|
static Size
|
||||||
columnar_parallelscan_estimate(Relation rel)
|
columnar_parallelscan_estimate(Relation rel)
|
||||||
{
|
{
|
||||||
return sizeof(ParallelBlockTableScanDescData);
|
elog(ERROR, "columnar_parallelscan_estimate not implemented");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
static Size
|
static Size
|
||||||
columnar_parallelscan_initialize(Relation rel, ParallelTableScanDesc pscan)
|
columnar_parallelscan_initialize(Relation rel, ParallelTableScanDesc pscan)
|
||||||
{
|
{
|
||||||
ParallelBlockTableScanDesc bpscan = (ParallelBlockTableScanDesc) pscan;
|
elog(ERROR, "columnar_parallelscan_initialize not implemented");
|
||||||
|
|
||||||
bpscan->base.phs_relid = RelationGetRelid(rel);
|
|
||||||
bpscan->phs_nblocks = RelationGetNumberOfBlocks(rel);
|
|
||||||
bpscan->base.phs_syncscan = synchronize_seqscans &&
|
|
||||||
!RelationUsesLocalBuffers(rel) &&
|
|
||||||
bpscan->phs_nblocks > NBuffers / 4;
|
|
||||||
SpinLockInit(&bpscan->phs_mutex);
|
|
||||||
bpscan->phs_startblock = InvalidBlockNumber;
|
|
||||||
pg_atomic_init_u64(&bpscan->phs_nallocated, 0);
|
|
||||||
|
|
||||||
return sizeof(ParallelBlockTableScanDescData);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
static void
|
static void
|
||||||
columnar_parallelscan_reinitialize(Relation rel, ParallelTableScanDesc pscan)
|
columnar_parallelscan_reinitialize(Relation rel, ParallelTableScanDesc pscan)
|
||||||
{
|
{
|
||||||
ParallelBlockTableScanDesc bpscan = (ParallelBlockTableScanDesc) pscan;
|
elog(ERROR, "columnar_parallelscan_reinitialize not implemented");
|
||||||
pg_atomic_write_u64(&bpscan->phs_nallocated, 0);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -1293,22 +1281,10 @@ columnar_index_build_range_scan(Relation columnarRelation,
|
||||||
if (scan)
|
if (scan)
|
||||||
{
|
{
|
||||||
/*
|
/*
|
||||||
* Scan is initialized iff postgres decided to build the index using
|
* Parallel scans on columnar tables are already discardad by
|
||||||
* parallel workers. In this case, we simply return for parallel
|
* ColumnarGetRelationInfoHook but be on the safe side.
|
||||||
* workers since we don't support parallel scan on columnar tables.
|
|
||||||
*/
|
*/
|
||||||
if (IsBackgroundWorker)
|
elog(ERROR, "parallel scans on columnar are not supported");
|
||||||
{
|
|
||||||
ereport(DEBUG4, (errmsg("ignoring parallel worker when building "
|
|
||||||
"index since parallel scan on columnar "
|
|
||||||
"tables is not supported")));
|
|
||||||
table_endscan(scan);
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
ereport(NOTICE, (errmsg("falling back to serial index build since "
|
|
||||||
"parallel scan on columnar tables is not "
|
|
||||||
"supported")));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
@ -1326,8 +1302,7 @@ columnar_index_build_range_scan(Relation columnarRelation,
|
||||||
|
|
||||||
Snapshot snapshot = { 0 };
|
Snapshot snapshot = { 0 };
|
||||||
bool snapshotRegisteredByUs = false;
|
bool snapshotRegisteredByUs = false;
|
||||||
if (!scan)
|
|
||||||
{
|
|
||||||
/*
|
/*
|
||||||
* For serial index build, we begin our own scan. We may also need to
|
* For serial index build, we begin our own scan. We may also need to
|
||||||
* register a snapshot whose lifetime is under our direct control.
|
* register a snapshot whose lifetime is under our direct control.
|
||||||
|
@ -1347,18 +1322,6 @@ columnar_index_build_range_scan(Relation columnarRelation,
|
||||||
bool allowAccessStrategy = true;
|
bool allowAccessStrategy = true;
|
||||||
scan = table_beginscan_strat(columnarRelation, snapshot, nkeys, scanKey,
|
scan = table_beginscan_strat(columnarRelation, snapshot, nkeys, scanKey,
|
||||||
allowAccessStrategy, allow_sync);
|
allowAccessStrategy, allow_sync);
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
/*
|
|
||||||
* For parallel index build, we don't register/unregister own snapshot
|
|
||||||
* since snapshot is taken from parallel scan. Note that even if we
|
|
||||||
* don't support parallel index builds, we still continue building the
|
|
||||||
* index via the main backend and we should still rely on the snapshot
|
|
||||||
* provided by parallel scan.
|
|
||||||
*/
|
|
||||||
snapshot = scan->rs_snapshot;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (progress)
|
if (progress)
|
||||||
{
|
{
|
||||||
|
@ -2113,17 +2076,6 @@ static const TableAmRoutine columnar_am_methods = {
|
||||||
.scan_rescan = columnar_rescan,
|
.scan_rescan = columnar_rescan,
|
||||||
.scan_getnextslot = columnar_getnextslot,
|
.scan_getnextslot = columnar_getnextslot,
|
||||||
|
|
||||||
/*
|
|
||||||
* Postgres calls following three callbacks during index builds, if it
|
|
||||||
* decides to use parallel workers when building the index. On the other
|
|
||||||
* hand, we don't support parallel scans on columnar tables but we also
|
|
||||||
* want to fallback to serial index build. For this reason, we both skip
|
|
||||||
* parallel workers in columnar_index_build_range_scan and also provide
|
|
||||||
* basic implementations for those callbacks based on their corresponding
|
|
||||||
* implementations in heapAM.
|
|
||||||
* Note that for regular query plans, we already ignore parallel paths via
|
|
||||||
* ColumnarSetRelPathlistHook.
|
|
||||||
*/
|
|
||||||
.parallelscan_estimate = columnar_parallelscan_estimate,
|
.parallelscan_estimate = columnar_parallelscan_estimate,
|
||||||
.parallelscan_initialize = columnar_parallelscan_initialize,
|
.parallelscan_initialize = columnar_parallelscan_initialize,
|
||||||
.parallelscan_reinitialize = columnar_parallelscan_reinitialize,
|
.parallelscan_reinitialize = columnar_parallelscan_reinitialize,
|
||||||
|
|
|
@ -799,6 +799,26 @@ DEBUG: pathlist hook for columnar table am
|
||||||
(0 rows)
|
(0 rows)
|
||||||
|
|
||||||
RESET client_min_messages;
|
RESET client_min_messages;
|
||||||
|
create table events (event_id bigserial, event_time timestamptz default now(), payload text);
|
||||||
|
create index on events (event_id);
|
||||||
|
insert into events (payload) select 'hello-'||s from generate_series(1,10) s;
|
||||||
|
BEGIN;
|
||||||
|
SET LOCAL force_parallel_mode = regress;
|
||||||
|
SET LOCAL min_parallel_table_scan_size = 1;
|
||||||
|
SET LOCAL parallel_tuple_cost = 0;
|
||||||
|
SET LOCAL max_parallel_workers = 4;
|
||||||
|
SET LOCAL max_parallel_workers_per_gather = 4;
|
||||||
|
select alter_table_set_access_method('events', 'columnar');
|
||||||
|
NOTICE: creating a new table for alter_table_set_access_method.events
|
||||||
|
NOTICE: moving the data of alter_table_set_access_method.events
|
||||||
|
NOTICE: dropping the old alter_table_set_access_method.events
|
||||||
|
NOTICE: renaming the new table to alter_table_set_access_method.events
|
||||||
|
alter_table_set_access_method
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
COMMIT;
|
||||||
SET client_min_messages TO WARNING;
|
SET client_min_messages TO WARNING;
|
||||||
DROP SCHEMA alter_table_set_access_method CASCADE;
|
DROP SCHEMA alter_table_set_access_method CASCADE;
|
||||||
SELECT 1 FROM master_remove_node('localhost', :master_port);
|
SELECT 1 FROM master_remove_node('localhost', :master_port);
|
||||||
|
|
|
@ -448,16 +448,10 @@ ERROR: unsupported access method for the index on columnar table brin_summarize
|
||||||
CREATE TABLE parallel_scan_test(a int) USING columnar WITH ( parallel_workers = 2 );
|
CREATE TABLE parallel_scan_test(a int) USING columnar WITH ( parallel_workers = 2 );
|
||||||
INSERT INTO parallel_scan_test SELECT i FROM generate_series(1,10) i;
|
INSERT INTO parallel_scan_test SELECT i FROM generate_series(1,10) i;
|
||||||
CREATE INDEX ON parallel_scan_test (a);
|
CREATE INDEX ON parallel_scan_test (a);
|
||||||
NOTICE: falling back to serial index build since parallel scan on columnar tables is not supported
|
|
||||||
VACUUM FULL parallel_scan_test;
|
VACUUM FULL parallel_scan_test;
|
||||||
NOTICE: falling back to serial index build since parallel scan on columnar tables is not supported
|
|
||||||
REINDEX TABLE parallel_scan_test;
|
REINDEX TABLE parallel_scan_test;
|
||||||
NOTICE: falling back to serial index build since parallel scan on columnar tables is not supported
|
|
||||||
CREATE INDEX CONCURRENTLY ON parallel_scan_test (a);
|
CREATE INDEX CONCURRENTLY ON parallel_scan_test (a);
|
||||||
NOTICE: falling back to serial index build since parallel scan on columnar tables is not supported
|
|
||||||
REINDEX TABLE CONCURRENTLY parallel_scan_test;
|
REINDEX TABLE CONCURRENTLY parallel_scan_test;
|
||||||
NOTICE: falling back to serial index build since parallel scan on columnar tables is not supported
|
|
||||||
NOTICE: falling back to serial index build since parallel scan on columnar tables is not supported
|
|
||||||
-- test with different data types & indexAM's --
|
-- test with different data types & indexAM's --
|
||||||
CREATE TABLE hash_text(a INT, b TEXT) USING columnar;
|
CREATE TABLE hash_text(a INT, b TEXT) USING columnar;
|
||||||
INSERT INTO hash_text SELECT i, (i*2)::TEXT FROM generate_series(1, 10) i;
|
INSERT INTO hash_text SELECT i, (i*2)::TEXT FROM generate_series(1, 10) i;
|
||||||
|
@ -550,5 +544,27 @@ ERROR: duplicate key value violates unique constraint "aborted_write_test_pkey"
|
||||||
DETAIL: Key (a)=(16999) already exists.
|
DETAIL: Key (a)=(16999) already exists.
|
||||||
-- since second INSERT already failed, should not throw a "duplicate key" error
|
-- since second INSERT already failed, should not throw a "duplicate key" error
|
||||||
REINDEX TABLE aborted_write_test;
|
REINDEX TABLE aborted_write_test;
|
||||||
|
create table events (event_id bigserial, event_time timestamptz default now(), payload text) using columnar;
|
||||||
|
BEGIN;
|
||||||
|
-- this wouldn't flush any data
|
||||||
|
insert into events (payload) select 'hello-'||s from generate_series(1, 10) s;
|
||||||
|
-- Since table is large enough, normally postgres would prefer using
|
||||||
|
-- parallel workers when building the index.
|
||||||
|
--
|
||||||
|
-- However, before starting to build the index, we will first flush
|
||||||
|
-- the writes of above INSERT and this would try to update the stripe
|
||||||
|
-- reservation entry in columnar.stripe when doing that.
|
||||||
|
--
|
||||||
|
-- However, updating a tuple during a parallel operation is not allowed
|
||||||
|
-- by postgres and throws an error. For this reason, here we don't expect
|
||||||
|
-- following commnad to fail since we prevent using parallel workers for
|
||||||
|
-- columnar tables.
|
||||||
|
SET LOCAL force_parallel_mode = regress;
|
||||||
|
SET LOCAL min_parallel_table_scan_size = 1;
|
||||||
|
SET LOCAL parallel_tuple_cost = 0;
|
||||||
|
SET LOCAL max_parallel_workers = 4;
|
||||||
|
SET LOCAL max_parallel_workers_per_gather = 4;
|
||||||
|
create index on events (event_id);
|
||||||
|
COMMIT;
|
||||||
SET client_min_messages TO WARNING;
|
SET client_min_messages TO WARNING;
|
||||||
DROP SCHEMA columnar_indexes CASCADE;
|
DROP SCHEMA columnar_indexes CASCADE;
|
||||||
|
|
|
@ -264,6 +264,19 @@ SELECT alter_table_set_access_method('abcde_012345678901234567890123456789012345
|
||||||
SELECT * FROM abcde_0123456789012345678901234567890123456789012345678901234567890123456789;
|
SELECT * FROM abcde_0123456789012345678901234567890123456789012345678901234567890123456789;
|
||||||
RESET client_min_messages;
|
RESET client_min_messages;
|
||||||
|
|
||||||
|
create table events (event_id bigserial, event_time timestamptz default now(), payload text);
|
||||||
|
create index on events (event_id);
|
||||||
|
insert into events (payload) select 'hello-'||s from generate_series(1,10) s;
|
||||||
|
|
||||||
|
BEGIN;
|
||||||
|
SET LOCAL force_parallel_mode = regress;
|
||||||
|
SET LOCAL min_parallel_table_scan_size = 1;
|
||||||
|
SET LOCAL parallel_tuple_cost = 0;
|
||||||
|
SET LOCAL max_parallel_workers = 4;
|
||||||
|
SET LOCAL max_parallel_workers_per_gather = 4;
|
||||||
|
select alter_table_set_access_method('events', 'columnar');
|
||||||
|
COMMIT;
|
||||||
|
|
||||||
SET client_min_messages TO WARNING;
|
SET client_min_messages TO WARNING;
|
||||||
DROP SCHEMA alter_table_set_access_method CASCADE;
|
DROP SCHEMA alter_table_set_access_method CASCADE;
|
||||||
SELECT 1 FROM master_remove_node('localhost', :master_port);
|
SELECT 1 FROM master_remove_node('localhost', :master_port);
|
||||||
|
|
|
@ -406,5 +406,29 @@ INSERT INTO aborted_write_test VALUES (16999);
|
||||||
-- since second INSERT already failed, should not throw a "duplicate key" error
|
-- since second INSERT already failed, should not throw a "duplicate key" error
|
||||||
REINDEX TABLE aborted_write_test;
|
REINDEX TABLE aborted_write_test;
|
||||||
|
|
||||||
|
create table events (event_id bigserial, event_time timestamptz default now(), payload text) using columnar;
|
||||||
|
BEGIN;
|
||||||
|
-- this wouldn't flush any data
|
||||||
|
insert into events (payload) select 'hello-'||s from generate_series(1, 10) s;
|
||||||
|
|
||||||
|
-- Since table is large enough, normally postgres would prefer using
|
||||||
|
-- parallel workers when building the index.
|
||||||
|
--
|
||||||
|
-- However, before starting to build the index, we will first flush
|
||||||
|
-- the writes of above INSERT and this would try to update the stripe
|
||||||
|
-- reservation entry in columnar.stripe when doing that.
|
||||||
|
--
|
||||||
|
-- However, updating a tuple during a parallel operation is not allowed
|
||||||
|
-- by postgres and throws an error. For this reason, here we don't expect
|
||||||
|
-- following commnad to fail since we prevent using parallel workers for
|
||||||
|
-- columnar tables.
|
||||||
|
SET LOCAL force_parallel_mode = regress;
|
||||||
|
SET LOCAL min_parallel_table_scan_size = 1;
|
||||||
|
SET LOCAL parallel_tuple_cost = 0;
|
||||||
|
SET LOCAL max_parallel_workers = 4;
|
||||||
|
SET LOCAL max_parallel_workers_per_gather = 4;
|
||||||
|
create index on events (event_id);
|
||||||
|
COMMIT;
|
||||||
|
|
||||||
SET client_min_messages TO WARNING;
|
SET client_min_messages TO WARNING;
|
||||||
DROP SCHEMA columnar_indexes CASCADE;
|
DROP SCHEMA columnar_indexes CASCADE;
|
||||||
|
|
Loading…
Reference in New Issue