mirror of https://github.com/citusdata/citus.git
Repartitioned INSERT/SELECT: Add a GUC to enable/disable it
parent
ce5eea4885
commit
a079278b0c
|
@ -48,6 +48,8 @@
|
||||||
#include "utils/rel.h"
|
#include "utils/rel.h"
|
||||||
#include "utils/snapmgr.h"
|
#include "utils/snapmgr.h"
|
||||||
|
|
||||||
|
/* Config variables managed via guc.c */
|
||||||
|
bool EnableRepartitionedInsertSelect = true;
|
||||||
|
|
||||||
/* depth of current insert/select executor. */
|
/* depth of current insert/select executor. */
|
||||||
static int insertSelectExecutorLevel = 0;
|
static int insertSelectExecutorLevel = 0;
|
||||||
|
@ -1031,6 +1033,11 @@ PartitionColumnIndex(List *insertTargetList, Var *partitionColumn)
|
||||||
bool
|
bool
|
||||||
IsRedistributablePlan(Plan *selectPlan)
|
IsRedistributablePlan(Plan *selectPlan)
|
||||||
{
|
{
|
||||||
|
if (!EnableRepartitionedInsertSelect)
|
||||||
|
{
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
/* don't redistribute if query is not distributed or requires merge on coordinator */
|
/* don't redistribute if query is not distributed or requires merge on coordinator */
|
||||||
if (!IsCitusCustomScan(selectPlan))
|
if (!IsCitusCustomScan(selectPlan))
|
||||||
{
|
{
|
||||||
|
|
|
@ -32,6 +32,7 @@
|
||||||
#include "distributed/connection_management.h"
|
#include "distributed/connection_management.h"
|
||||||
#include "distributed/cte_inline.h"
|
#include "distributed/cte_inline.h"
|
||||||
#include "distributed/distributed_deadlock_detection.h"
|
#include "distributed/distributed_deadlock_detection.h"
|
||||||
|
#include "distributed/insert_select_executor.h"
|
||||||
#include "distributed/intermediate_result_pruning.h"
|
#include "distributed/intermediate_result_pruning.h"
|
||||||
#include "distributed/local_executor.h"
|
#include "distributed/local_executor.h"
|
||||||
#include "distributed/maintenanced.h"
|
#include "distributed/maintenanced.h"
|
||||||
|
@ -457,6 +458,16 @@ RegisterCitusConfigVariables(void)
|
||||||
GUC_NO_SHOW_ALL,
|
GUC_NO_SHOW_ALL,
|
||||||
NULL, NULL, NULL);
|
NULL, NULL, NULL);
|
||||||
|
|
||||||
|
DefineCustomBoolVariable(
|
||||||
|
"citus.enable_repartitioned_insert_select",
|
||||||
|
gettext_noop("Enables repartitioned INSERT/SELECTs"),
|
||||||
|
NULL,
|
||||||
|
&EnableRepartitionedInsertSelect,
|
||||||
|
true,
|
||||||
|
PGC_USERSET,
|
||||||
|
GUC_NO_SHOW_ALL,
|
||||||
|
NULL, NULL, NULL);
|
||||||
|
|
||||||
DefineCustomBoolVariable(
|
DefineCustomBoolVariable(
|
||||||
"citus.enable_fast_path_router_planner",
|
"citus.enable_fast_path_router_planner",
|
||||||
gettext_noop("Enables fast path router planner"),
|
gettext_noop("Enables fast path router planner"),
|
||||||
|
|
|
@ -16,6 +16,7 @@
|
||||||
|
|
||||||
#include "executor/execdesc.h"
|
#include "executor/execdesc.h"
|
||||||
|
|
||||||
|
extern bool EnableRepartitionedInsertSelect;
|
||||||
|
|
||||||
extern TupleTableSlot * CoordinatorInsertSelectExecScan(CustomScanState *node);
|
extern TupleTableSlot * CoordinatorInsertSelectExecScan(CustomScanState *node);
|
||||||
extern bool ExecutingInsertSelect(void);
|
extern bool ExecutingInsertSelect(void);
|
||||||
|
|
|
@ -791,7 +791,6 @@ INSERT INTO test_table
|
||||||
FROM
|
FROM
|
||||||
fist_table_cte;
|
fist_table_cte;
|
||||||
DEBUG: distributed INSERT ... SELECT can only select from distributed tables
|
DEBUG: distributed INSERT ... SELECT can only select from distributed tables
|
||||||
DEBUG: Collecting INSERT ... SELECT results on coordinator
|
|
||||||
DEBUG: CTE fist_table_cte is going to be inlined via distributed planning
|
DEBUG: CTE fist_table_cte is going to be inlined via distributed planning
|
||||||
DEBUG: Router planner cannot handle multi-shard select queries
|
DEBUG: Router planner cannot handle multi-shard select queries
|
||||||
DEBUG: Router planner cannot handle multi-shard select queries
|
DEBUG: Router planner cannot handle multi-shard select queries
|
||||||
|
@ -800,6 +799,7 @@ DEBUG: Router planner cannot handle multi-shard select queries
|
||||||
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT key, value FROM (SELECT fist_table_cte.key, fist_table_cte.value FROM (SELECT intermediate_result.key, intermediate_result.value, intermediate_result.other_value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, other_value jsonb)) fist_table_cte) citus_insert_select_subquery
|
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT key, value FROM (SELECT fist_table_cte.key, fist_table_cte.value FROM (SELECT intermediate_result.key, intermediate_result.value, intermediate_result.other_value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, other_value jsonb)) fist_table_cte) citus_insert_select_subquery
|
||||||
DEBUG: Creating router plan
|
DEBUG: Creating router plan
|
||||||
DEBUG: Plan is router executable
|
DEBUG: Plan is router executable
|
||||||
|
DEBUG: Collecting INSERT ... SELECT results on coordinator
|
||||||
-- the following INSERT..SELECT is even more interesting
|
-- the following INSERT..SELECT is even more interesting
|
||||||
-- the CTE becomes pushdownable
|
-- the CTE becomes pushdownable
|
||||||
INSERT INTO test_table
|
INSERT INTO test_table
|
||||||
|
|
|
@ -875,6 +875,50 @@ SELECT count(*) FROM target_table;
|
||||||
10
|
10
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
|
--
|
||||||
|
-- Disable repartitioned insert/select
|
||||||
|
--
|
||||||
|
TRUNCATE target_table;
|
||||||
|
SET citus.enable_repartitioned_insert_select TO OFF;
|
||||||
|
EXPLAIN (costs off) INSERT INTO target_table SELECT a AS aa, b AS aa, 1 AS aa, 2 AS aa FROM source_table;
|
||||||
|
QUERY PLAN
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
Custom Scan (Citus INSERT ... SELECT)
|
||||||
|
INSERT/SELECT method: pull to coordinator
|
||||||
|
-> Custom Scan (Citus Adaptive)
|
||||||
|
Task Count: 4
|
||||||
|
Tasks Shown: One of 4
|
||||||
|
-> Task
|
||||||
|
Node: host=localhost port=xxxxx dbname=regression
|
||||||
|
-> Seq Scan on source_table_4213629 source_table
|
||||||
|
(8 rows)
|
||||||
|
|
||||||
|
SET client_min_messages TO DEBUG2;
|
||||||
|
INSERT INTO target_table SELECT a AS aa, b AS aa, 1 AS aa, 2 AS aa FROM source_table;
|
||||||
|
DEBUG: INSERT target table and the source relation of the SELECT partition column value must be colocated in distributed INSERT ... SELECT
|
||||||
|
DEBUG: Router planner cannot handle multi-shard select queries
|
||||||
|
DEBUG: Collecting INSERT ... SELECT results on coordinator
|
||||||
|
RESET client_min_messages;
|
||||||
|
SELECT count(*) FROM target_table;
|
||||||
|
count
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
10
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SET citus.enable_repartitioned_insert_select TO ON;
|
||||||
|
EXPLAIN (costs off) INSERT INTO target_table SELECT a AS aa, b AS aa, 1 AS aa, 2 AS aa FROM source_table;
|
||||||
|
QUERY PLAN
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
Custom Scan (Citus INSERT ... SELECT)
|
||||||
|
INSERT/SELECT method: repartition
|
||||||
|
-> Custom Scan (Citus Adaptive)
|
||||||
|
Task Count: 4
|
||||||
|
Tasks Shown: One of 4
|
||||||
|
-> Task
|
||||||
|
Node: host=localhost port=xxxxx dbname=regression
|
||||||
|
-> Seq Scan on source_table_4213629 source_table
|
||||||
|
(8 rows)
|
||||||
|
|
||||||
DROP TABLE source_table, target_table;
|
DROP TABLE source_table, target_table;
|
||||||
SET client_min_messages TO WARNING;
|
SET client_min_messages TO WARNING;
|
||||||
DROP SCHEMA insert_select_repartition CASCADE;
|
DROP SCHEMA insert_select_repartition CASCADE;
|
||||||
|
|
|
@ -418,6 +418,24 @@ RESET client_min_messages;
|
||||||
|
|
||||||
SELECT count(*) FROM target_table;
|
SELECT count(*) FROM target_table;
|
||||||
|
|
||||||
|
--
|
||||||
|
-- Disable repartitioned insert/select
|
||||||
|
--
|
||||||
|
|
||||||
|
TRUNCATE target_table;
|
||||||
|
SET citus.enable_repartitioned_insert_select TO OFF;
|
||||||
|
|
||||||
|
EXPLAIN (costs off) INSERT INTO target_table SELECT a AS aa, b AS aa, 1 AS aa, 2 AS aa FROM source_table;
|
||||||
|
|
||||||
|
SET client_min_messages TO DEBUG2;
|
||||||
|
INSERT INTO target_table SELECT a AS aa, b AS aa, 1 AS aa, 2 AS aa FROM source_table;
|
||||||
|
RESET client_min_messages;
|
||||||
|
|
||||||
|
SELECT count(*) FROM target_table;
|
||||||
|
|
||||||
|
SET citus.enable_repartitioned_insert_select TO ON;
|
||||||
|
EXPLAIN (costs off) INSERT INTO target_table SELECT a AS aa, b AS aa, 1 AS aa, 2 AS aa FROM source_table;
|
||||||
|
|
||||||
DROP TABLE source_table, target_table;
|
DROP TABLE source_table, target_table;
|
||||||
|
|
||||||
SET client_min_messages TO WARNING;
|
SET client_min_messages TO WARNING;
|
||||||
|
|
Loading…
Reference in New Issue