diff --git a/src/backend/distributed/executor/insert_select_executor.c b/src/backend/distributed/executor/insert_select_executor.c index bc9dfdc9f..179f00287 100644 --- a/src/backend/distributed/executor/insert_select_executor.c +++ b/src/backend/distributed/executor/insert_select_executor.c @@ -48,6 +48,8 @@ #include "utils/rel.h" #include "utils/snapmgr.h" +/* Config variables managed via guc.c */ +bool EnableRepartitionedInsertSelect = true; /* depth of current insert/select executor. */ static int insertSelectExecutorLevel = 0; @@ -1031,6 +1033,11 @@ PartitionColumnIndex(List *insertTargetList, Var *partitionColumn) bool IsRedistributablePlan(Plan *selectPlan) { + if (!EnableRepartitionedInsertSelect) + { + return false; + } + /* don't redistribute if query is not distributed or requires merge on coordinator */ if (!IsCitusCustomScan(selectPlan)) { diff --git a/src/backend/distributed/shared_library_init.c b/src/backend/distributed/shared_library_init.c index 11cc2c565..0e9d1a712 100644 --- a/src/backend/distributed/shared_library_init.c +++ b/src/backend/distributed/shared_library_init.c @@ -32,6 +32,7 @@ #include "distributed/connection_management.h" #include "distributed/cte_inline.h" #include "distributed/distributed_deadlock_detection.h" +#include "distributed/insert_select_executor.h" #include "distributed/intermediate_result_pruning.h" #include "distributed/local_executor.h" #include "distributed/maintenanced.h" @@ -457,6 +458,16 @@ RegisterCitusConfigVariables(void) GUC_NO_SHOW_ALL, NULL, NULL, NULL); + DefineCustomBoolVariable( + "citus.enable_repartitioned_insert_select", + gettext_noop("Enables repartitioned INSERT/SELECTs"), + NULL, + &EnableRepartitionedInsertSelect, + true, + PGC_USERSET, + GUC_NO_SHOW_ALL, + NULL, NULL, NULL); + DefineCustomBoolVariable( "citus.enable_fast_path_router_planner", gettext_noop("Enables fast path router planner"), diff --git a/src/include/distributed/insert_select_executor.h b/src/include/distributed/insert_select_executor.h index a2101ee1a..e94c7fe9c 100644 --- a/src/include/distributed/insert_select_executor.h +++ b/src/include/distributed/insert_select_executor.h @@ -16,6 +16,7 @@ #include "executor/execdesc.h" +extern bool EnableRepartitionedInsertSelect; extern TupleTableSlot * CoordinatorInsertSelectExecScan(CustomScanState *node); extern bool ExecutingInsertSelect(void); diff --git a/src/test/regress/expected/cte_inline_0.out b/src/test/regress/expected/cte_inline_0.out index 0ec39e590..66bd359e7 100644 --- a/src/test/regress/expected/cte_inline_0.out +++ b/src/test/regress/expected/cte_inline_0.out @@ -791,7 +791,6 @@ INSERT INTO test_table FROM fist_table_cte; DEBUG: distributed INSERT ... SELECT can only select from distributed tables -DEBUG: Collecting INSERT ... SELECT results on coordinator DEBUG: CTE fist_table_cte is going to be inlined via distributed planning DEBUG: Router planner cannot handle multi-shard select queries DEBUG: Router planner cannot handle multi-shard select queries @@ -800,6 +799,7 @@ DEBUG: Router planner cannot handle multi-shard select queries DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT key, value FROM (SELECT fist_table_cte.key, fist_table_cte.value FROM (SELECT intermediate_result.key, intermediate_result.value, intermediate_result.other_value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, other_value jsonb)) fist_table_cte) citus_insert_select_subquery DEBUG: Creating router plan DEBUG: Plan is router executable +DEBUG: Collecting INSERT ... SELECT results on coordinator -- the following INSERT..SELECT is even more interesting -- the CTE becomes pushdownable INSERT INTO test_table diff --git a/src/test/regress/expected/insert_select_repartition.out b/src/test/regress/expected/insert_select_repartition.out index 72202661e..6eb070560 100644 --- a/src/test/regress/expected/insert_select_repartition.out +++ b/src/test/regress/expected/insert_select_repartition.out @@ -875,6 +875,50 @@ SELECT count(*) FROM target_table; 10 (1 row) +-- +-- Disable repartitioned insert/select +-- +TRUNCATE target_table; +SET citus.enable_repartitioned_insert_select TO OFF; +EXPLAIN (costs off) INSERT INTO target_table SELECT a AS aa, b AS aa, 1 AS aa, 2 AS aa FROM source_table; + QUERY PLAN +--------------------------------------------------------------------- + Custom Scan (Citus INSERT ... SELECT) + INSERT/SELECT method: pull to coordinator + -> Custom Scan (Citus Adaptive) + Task Count: 4 + Tasks Shown: One of 4 + -> Task + Node: host=localhost port=xxxxx dbname=regression + -> Seq Scan on source_table_4213629 source_table +(8 rows) + +SET client_min_messages TO DEBUG2; +INSERT INTO target_table SELECT a AS aa, b AS aa, 1 AS aa, 2 AS aa FROM source_table; +DEBUG: INSERT target table and the source relation of the SELECT partition column value must be colocated in distributed INSERT ... SELECT +DEBUG: Router planner cannot handle multi-shard select queries +DEBUG: Collecting INSERT ... SELECT results on coordinator +RESET client_min_messages; +SELECT count(*) FROM target_table; + count +--------------------------------------------------------------------- + 10 +(1 row) + +SET citus.enable_repartitioned_insert_select TO ON; +EXPLAIN (costs off) INSERT INTO target_table SELECT a AS aa, b AS aa, 1 AS aa, 2 AS aa FROM source_table; + QUERY PLAN +--------------------------------------------------------------------- + Custom Scan (Citus INSERT ... SELECT) + INSERT/SELECT method: repartition + -> Custom Scan (Citus Adaptive) + Task Count: 4 + Tasks Shown: One of 4 + -> Task + Node: host=localhost port=xxxxx dbname=regression + -> Seq Scan on source_table_4213629 source_table +(8 rows) + DROP TABLE source_table, target_table; SET client_min_messages TO WARNING; DROP SCHEMA insert_select_repartition CASCADE; diff --git a/src/test/regress/sql/insert_select_repartition.sql b/src/test/regress/sql/insert_select_repartition.sql index 4a44ed31b..fb80de3b1 100644 --- a/src/test/regress/sql/insert_select_repartition.sql +++ b/src/test/regress/sql/insert_select_repartition.sql @@ -418,6 +418,24 @@ RESET client_min_messages; SELECT count(*) FROM target_table; +-- +-- Disable repartitioned insert/select +-- + +TRUNCATE target_table; +SET citus.enable_repartitioned_insert_select TO OFF; + +EXPLAIN (costs off) INSERT INTO target_table SELECT a AS aa, b AS aa, 1 AS aa, 2 AS aa FROM source_table; + +SET client_min_messages TO DEBUG2; +INSERT INTO target_table SELECT a AS aa, b AS aa, 1 AS aa, 2 AS aa FROM source_table; +RESET client_min_messages; + +SELECT count(*) FROM target_table; + +SET citus.enable_repartitioned_insert_select TO ON; +EXPLAIN (costs off) INSERT INTO target_table SELECT a AS aa, b AS aa, 1 AS aa, 2 AS aa FROM source_table; + DROP TABLE source_table, target_table; SET client_min_messages TO WARNING;