Add a guc for the feature

onur-leftjoin_push-improvements
eaydingol 2025-08-01 10:25:43 +03:00
parent fc109f408b
commit 188043c5e7
6 changed files with 60 additions and 4 deletions

View File

@ -95,6 +95,8 @@
#include "distributed/shard_pruning.h"
#include "distributed/version_compat.h"
bool EnableRecurringOuterJoinPushdown = true;
/*
* RecursivePlanningContext is used to recursively plan subqueries
* and CTEs, pull results to the coordinator, and push it back into
@ -2767,6 +2769,11 @@ CheckPushDownFeasibilityAndComputeIndexes(JoinExpr *joinExpr, Query *query,
int *outerRtIndex, RangeTblEntry **outerRte,
RangeTblEntry **distRte, int *attnum)
{
if (!EnableRecurringOuterJoinPushdown)
{
return false;
}
if (!IS_OUTER_JOIN(joinExpr->jointype))
{
return false;

View File

@ -1480,6 +1480,21 @@ RegisterCitusConfigVariables(void)
GUC_NO_SHOW_ALL | GUC_NOT_IN_SAMPLE,
NULL, NULL, NULL);
DefineCustomBoolVariable(
"citus.enable_recurring_outer_join_pushdown",
gettext_noop("Enables outer join pushdown for recurring relations."),
gettext_noop("When enabled, Citus will try to push down outer joins "
"between recurring and non-recurring relations to workers "
"whenever feasible by introducing correctness constraints "
"to the where clause of the query. Note that if this is "
"disabled, or push down is not feasible, the result will "
"be computed via recursive planning."),
&EnableRecurringOuterJoinPushdown,
true,
PGC_USERSET,
GUC_NO_SHOW_ALL | GUC_NOT_IN_SAMPLE,
NULL, NULL, NULL);
DefineCustomBoolVariable(
"citus.enable_repartition_joins",
gettext_noop("Allows Citus to repartition data between nodes."),
@ -2319,6 +2334,7 @@ RegisterCitusConfigVariables(void)
GUC_UNIT_BYTE | GUC_NO_SHOW_ALL | GUC_NOT_IN_SAMPLE,
NULL, NULL, NULL);
DefineCustomIntVariable(
"citus.recover_2pc_interval",
gettext_noop("Sets the time to wait between recovering 2PCs."),

View File

@ -20,6 +20,7 @@
#include "distributed/log_utils.h"
#include "distributed/relation_restriction_equivalence.h"
extern bool EnableRecurringOuterJoinPushdown;
typedef struct RecursivePlanningContextInternal RecursivePlanningContext;
typedef struct RangeTblEntryIndex

View File

@ -105,6 +105,33 @@ SELECT * FROM r1_local LEFT JOIN d1_local using (a, b) ORDER BY 1, 2;
10 | 2
(9 rows)
-- Disable the pushdown and verify that the join is not pushed down
SET citus.enable_recurring_outer_join_pushdown TO off;
SELECT count(*) FROM r1 LEFT JOIN d1 using (a);
DEBUG: no shard pruning constraints on d1 found
DEBUG: shard count after pruning for d1: 4
DEBUG: Router planner cannot handle multi-shard select queries
DEBUG: recursively planning right side of the left join since the outer side is a recurring rel
DEBUG: recursively planning distributed relation "d1" since it is part of a distributed join node that is outer joined with a recurring rel
DEBUG: Wrapping relation "d1" to a subquery
DEBUG: no shard pruning constraints on d1 found
DEBUG: shard count after pruning for d1: 4
DEBUG: Router planner cannot handle multi-shard select queries
DEBUG: no shard pruning constraints on d1 found
DEBUG: shard count after pruning for d1: 4
DEBUG: assigned task to node localhost:xxxxx
DEBUG: assigned task to node localhost:xxxxx
DEBUG: assigned task to node localhost:xxxxx
DEBUG: assigned task to node localhost:xxxxx
DEBUG: generating subplan XXX_1 for subquery SELECT a FROM recurring_join_pushdown.d1 WHERE true
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (recurring_join_pushdown.r1 LEFT JOIN (SELECT d1_1.a, NULL::integer AS b FROM (SELECT intermediate_result.a FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(a integer)) d1_1) d1 USING (a))
DEBUG: Creating router plan
count
---------------------------------------------------------------------
21
(1 row)
SET citus.enable_recurring_outer_join_pushdown TO on;
SET client_min_messages TO DEBUG1;
-- Test that the join is not pushed down when joined on a non-distributed column
SELECT count(*) FROM r1 LEFT JOIN d1 USING (b);

View File

@ -42,6 +42,11 @@ SELECT count(*) FROM r1_local LEFT JOIN d1_local using (a);
SELECT * FROM r1 LEFT JOIN d1 using (a, b) ORDER BY 1, 2;
SELECT * FROM r1_local LEFT JOIN d1_local using (a, b) ORDER BY 1, 2;
-- Disable the pushdown and verify that the join is not pushed down
SET citus.enable_recurring_outer_join_pushdown TO off;
SELECT count(*) FROM r1 LEFT JOIN d1 using (a);
SET citus.enable_recurring_outer_join_pushdown TO on;
SET client_min_messages TO DEBUG1;
-- Test that the join is not pushed down when joined on a non-distributed column
SELECT count(*) FROM r1 LEFT JOIN d1 USING (b);
@ -64,17 +69,17 @@ SELECT * FROM r1_local LEFT JOIN d1_local ON r1_local.a = d1_local.a AND r1_loca
EXPLAIN (COSTS OFF) SELECT * FROM r1 LEFT JOIN d1 ON r1.a = d1.a AND r1.b = d1.b ORDER BY 1, 2;
SELECT count(*) FROM r1 LEFT JOIN d1 ON r1.b = d1.a;
SELECT count(*) FROM r1_local LEFT JOIN d1_local ON r1_local.b = d1_local.a;
SELECT count(*) FROM r1_local LEFT JOIN d1_local ON r1_local.b = d1_local.a;
-- Test that the join is not pushed down when joined on a non-distributed column
SELECT count(*) FROM r1 LEFT JOIN d1 ON r1.b = d1.b;
SELECT count(*) FROM r1_local LEFT JOIN d1_local ON r1_local.b = d1_local.b;
SELECT count(*) FROM r1 LEFT JOIN d1 ON r1.a = d1.b;
SELECT count(*) FROM r1_local LEFT JOIN d1_local ON r1_local.a = d1_local.b;
SELECT count(*) FROM r1_local LEFT JOIN d1_local ON r1_local.a = d1_local.b;
SET client_min_messages TO DEBUG1;
-- Test that the join is not pushed down when joined on a distributed column with disjunctive conditions
SELECT count(*) FROM r1 LEFT JOIN d1 ON r1.a = d1.a OR r1.b = d1.b;
SELECT count(*) FROM r1_local LEFT JOIN d1_local ON r1_local.a = d1_local.a OR r1_local.b = d1_local.b;
SELECT count(*) FROM r1_local LEFT JOIN d1_local ON r1_local.a = d1_local.a OR r1_local.b = d1_local.b;
-- Test join pushdown behavior when the inner part of the join is a subquery
-- Using 'using' syntax

View File

@ -103,7 +103,7 @@ INSERT INTO ref_2 VALUES
(null, 401),
(null, 402);
CREATE TABLE ref_2_local(LIKE ref_2);
CREATE TABLE ref_2_local(LIKE ref_2);
INSERT INTO ref_2_local SELECT * FROM ref_2;
CREATE TABLE local_1 (a int, b int);