From 188043c5e7c7150909415628c00b4a74d24c407f Mon Sep 17 00:00:00 2001 From: eaydingol Date: Fri, 1 Aug 2025 10:25:43 +0300 Subject: [PATCH] Add a guc for the feature --- .../distributed/planner/recursive_planning.c | 7 +++++ src/backend/distributed/shared_library_init.c | 16 +++++++++++ src/include/distributed/recursive_planning.h | 1 + .../expected/recurring_join_pushdown.out | 27 +++++++++++++++++++ .../regress/sql/recurring_join_pushdown.sql | 11 +++++--- src/test/regress/sql/recurring_outer_join.sql | 2 +- 6 files changed, 60 insertions(+), 4 deletions(-) diff --git a/src/backend/distributed/planner/recursive_planning.c b/src/backend/distributed/planner/recursive_planning.c index 5ceef7874..bd49a096b 100644 --- a/src/backend/distributed/planner/recursive_planning.c +++ b/src/backend/distributed/planner/recursive_planning.c @@ -95,6 +95,8 @@ #include "distributed/shard_pruning.h" #include "distributed/version_compat.h" +bool EnableRecurringOuterJoinPushdown = true; + /* * RecursivePlanningContext is used to recursively plan subqueries * and CTEs, pull results to the coordinator, and push it back into @@ -2767,6 +2769,11 @@ CheckPushDownFeasibilityAndComputeIndexes(JoinExpr *joinExpr, Query *query, int *outerRtIndex, RangeTblEntry **outerRte, RangeTblEntry **distRte, int *attnum) { + if (!EnableRecurringOuterJoinPushdown) + { + return false; + } + if (!IS_OUTER_JOIN(joinExpr->jointype)) { return false; diff --git a/src/backend/distributed/shared_library_init.c b/src/backend/distributed/shared_library_init.c index 165aea05f..d98945e01 100644 --- a/src/backend/distributed/shared_library_init.c +++ b/src/backend/distributed/shared_library_init.c @@ -1480,6 +1480,21 @@ RegisterCitusConfigVariables(void) GUC_NO_SHOW_ALL | GUC_NOT_IN_SAMPLE, NULL, NULL, NULL); + DefineCustomBoolVariable( + "citus.enable_recurring_outer_join_pushdown", + gettext_noop("Enables outer join pushdown for recurring relations."), + gettext_noop("When enabled, Citus will try to push down outer joins " + "between recurring and non-recurring relations to workers " + "whenever feasible by introducing correctness constraints " + "to the where clause of the query. Note that if this is " + "disabled, or push down is not feasible, the result will " + "be computed via recursive planning."), + &EnableRecurringOuterJoinPushdown, + true, + PGC_USERSET, + GUC_NO_SHOW_ALL | GUC_NOT_IN_SAMPLE, + NULL, NULL, NULL); + DefineCustomBoolVariable( "citus.enable_repartition_joins", gettext_noop("Allows Citus to repartition data between nodes."), @@ -2319,6 +2334,7 @@ RegisterCitusConfigVariables(void) GUC_UNIT_BYTE | GUC_NO_SHOW_ALL | GUC_NOT_IN_SAMPLE, NULL, NULL, NULL); + DefineCustomIntVariable( "citus.recover_2pc_interval", gettext_noop("Sets the time to wait between recovering 2PCs."), diff --git a/src/include/distributed/recursive_planning.h b/src/include/distributed/recursive_planning.h index 3c43654b9..7bdfef18a 100644 --- a/src/include/distributed/recursive_planning.h +++ b/src/include/distributed/recursive_planning.h @@ -20,6 +20,7 @@ #include "distributed/log_utils.h" #include "distributed/relation_restriction_equivalence.h" +extern bool EnableRecurringOuterJoinPushdown; typedef struct RecursivePlanningContextInternal RecursivePlanningContext; typedef struct RangeTblEntryIndex diff --git a/src/test/regress/expected/recurring_join_pushdown.out b/src/test/regress/expected/recurring_join_pushdown.out index e9ba8da80..b7ded4253 100644 --- a/src/test/regress/expected/recurring_join_pushdown.out +++ b/src/test/regress/expected/recurring_join_pushdown.out @@ -105,6 +105,33 @@ SELECT * FROM r1_local LEFT JOIN d1_local using (a, b) ORDER BY 1, 2; 10 | 2 (9 rows) +-- Disable the pushdown and verify that the join is not pushed down +SET citus.enable_recurring_outer_join_pushdown TO off; +SELECT count(*) FROM r1 LEFT JOIN d1 using (a); +DEBUG: no shard pruning constraints on d1 found +DEBUG: shard count after pruning for d1: 4 +DEBUG: Router planner cannot handle multi-shard select queries +DEBUG: recursively planning right side of the left join since the outer side is a recurring rel +DEBUG: recursively planning distributed relation "d1" since it is part of a distributed join node that is outer joined with a recurring rel +DEBUG: Wrapping relation "d1" to a subquery +DEBUG: no shard pruning constraints on d1 found +DEBUG: shard count after pruning for d1: 4 +DEBUG: Router planner cannot handle multi-shard select queries +DEBUG: no shard pruning constraints on d1 found +DEBUG: shard count after pruning for d1: 4 +DEBUG: assigned task to node localhost:xxxxx +DEBUG: assigned task to node localhost:xxxxx +DEBUG: assigned task to node localhost:xxxxx +DEBUG: assigned task to node localhost:xxxxx +DEBUG: generating subplan XXX_1 for subquery SELECT a FROM recurring_join_pushdown.d1 WHERE true +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (recurring_join_pushdown.r1 LEFT JOIN (SELECT d1_1.a, NULL::integer AS b FROM (SELECT intermediate_result.a FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(a integer)) d1_1) d1 USING (a)) +DEBUG: Creating router plan + count +--------------------------------------------------------------------- + 21 +(1 row) + +SET citus.enable_recurring_outer_join_pushdown TO on; SET client_min_messages TO DEBUG1; -- Test that the join is not pushed down when joined on a non-distributed column SELECT count(*) FROM r1 LEFT JOIN d1 USING (b); diff --git a/src/test/regress/sql/recurring_join_pushdown.sql b/src/test/regress/sql/recurring_join_pushdown.sql index 6522839bf..874f6e532 100644 --- a/src/test/regress/sql/recurring_join_pushdown.sql +++ b/src/test/regress/sql/recurring_join_pushdown.sql @@ -42,6 +42,11 @@ SELECT count(*) FROM r1_local LEFT JOIN d1_local using (a); SELECT * FROM r1 LEFT JOIN d1 using (a, b) ORDER BY 1, 2; SELECT * FROM r1_local LEFT JOIN d1_local using (a, b) ORDER BY 1, 2; +-- Disable the pushdown and verify that the join is not pushed down +SET citus.enable_recurring_outer_join_pushdown TO off; +SELECT count(*) FROM r1 LEFT JOIN d1 using (a); +SET citus.enable_recurring_outer_join_pushdown TO on; + SET client_min_messages TO DEBUG1; -- Test that the join is not pushed down when joined on a non-distributed column SELECT count(*) FROM r1 LEFT JOIN d1 USING (b); @@ -64,17 +69,17 @@ SELECT * FROM r1_local LEFT JOIN d1_local ON r1_local.a = d1_local.a AND r1_loca EXPLAIN (COSTS OFF) SELECT * FROM r1 LEFT JOIN d1 ON r1.a = d1.a AND r1.b = d1.b ORDER BY 1, 2; SELECT count(*) FROM r1 LEFT JOIN d1 ON r1.b = d1.a; -SELECT count(*) FROM r1_local LEFT JOIN d1_local ON r1_local.b = d1_local.a; +SELECT count(*) FROM r1_local LEFT JOIN d1_local ON r1_local.b = d1_local.a; -- Test that the join is not pushed down when joined on a non-distributed column SELECT count(*) FROM r1 LEFT JOIN d1 ON r1.b = d1.b; SELECT count(*) FROM r1_local LEFT JOIN d1_local ON r1_local.b = d1_local.b; SELECT count(*) FROM r1 LEFT JOIN d1 ON r1.a = d1.b; -SELECT count(*) FROM r1_local LEFT JOIN d1_local ON r1_local.a = d1_local.b; +SELECT count(*) FROM r1_local LEFT JOIN d1_local ON r1_local.a = d1_local.b; SET client_min_messages TO DEBUG1; -- Test that the join is not pushed down when joined on a distributed column with disjunctive conditions SELECT count(*) FROM r1 LEFT JOIN d1 ON r1.a = d1.a OR r1.b = d1.b; -SELECT count(*) FROM r1_local LEFT JOIN d1_local ON r1_local.a = d1_local.a OR r1_local.b = d1_local.b; +SELECT count(*) FROM r1_local LEFT JOIN d1_local ON r1_local.a = d1_local.a OR r1_local.b = d1_local.b; -- Test join pushdown behavior when the inner part of the join is a subquery -- Using 'using' syntax diff --git a/src/test/regress/sql/recurring_outer_join.sql b/src/test/regress/sql/recurring_outer_join.sql index 8fd1c4340..5bef6b9be 100644 --- a/src/test/regress/sql/recurring_outer_join.sql +++ b/src/test/regress/sql/recurring_outer_join.sql @@ -103,7 +103,7 @@ INSERT INTO ref_2 VALUES (null, 401), (null, 402); -CREATE TABLE ref_2_local(LIKE ref_2); +CREATE TABLE ref_2_local(LIKE ref_2); INSERT INTO ref_2_local SELECT * FROM ref_2; CREATE TABLE local_1 (a int, b int);