Fix unexpected column index error for repartitioned merge (#8201)

DESCRIPTION: Fixes a bug that causes an unexpected error when executing
repartitioned merge.

Fixes #8180.

This was happening because of a bug in
SourceResultPartitionColumnIndex(). And to fix it, this PR avoids
using DistributionColumnIndex() in SourceResultPartitionColumnIndex().
Instead, invents FindTargetListEntryWithVarExprAttno(), which finds
the index of the target entry in the source query's target list that
can be used to repartition the source for a repartitioned merge. In
short, to find the source target entry that refences the Var used in
ON (..) clause and that references the source rte, we should check the
varattno of the underlying expr, which presumably is always a Var for
repartitioned merge as we always wrap the source rte with a subquery,
where all target entries point to the columns of the original source
relation.

Using DistributionColumnIndex() prior to 13.0 wasn't causing such an
issue because prior to 13.0, the varattno of the underlying expr of
the source target entries was almost (*1) always equal to resno of the
target entry as we were including all target entries of the source
relation. However, starting with #7659, which is merged to main before
13.0, we started using CreateFilteredTargetListForRelation() instead of 
CreateAllTargetListForRelation() to compute the target entry list for
the source rte to fix another bug. So we cannot revert to using
CreateAllTargetListForRelation() because otherwise we would re-introduce
bug that it helped fixing, so we instead had to find a way to properly
deal with the "filtered target list"s, as in this commit. Plus (*1),
even before #7659, probably we would still fail when the source relation
has dropped attributes or such because that would probably also cause
such a mismatch between the varattno of the underlying expr of the
target entry and its resno.
pull/8216/head
Onur Tirtir 2025-09-23 14:17:51 +03:00 committed by GitHub
parent b5e70f56ab
commit 83b25e1fb1
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 298 additions and 11 deletions

View File

@ -41,6 +41,7 @@
static int SourceResultPartitionColumnIndex(Query *mergeQuery,
List *sourceTargetList,
CitusTableCacheEntry *targetRelation);
static int FindTargetListEntryWithVarExprAttno(List *targetList, AttrNumber varattno);
static Var * ValidateAndReturnVarIfSupported(Node *entryExpr);
static DeferredErrorMessage * DeferErrorIfTargetHasFalseClause(Oid targetRelationId,
PlannerRestrictionContext *
@ -1414,7 +1415,8 @@ SourceResultPartitionColumnIndex(Query *mergeQuery, List *sourceTargetList,
Assert(sourceRepartitionVar);
int sourceResultRepartitionColumnIndex =
DistributionColumnIndex(sourceTargetList, sourceRepartitionVar);
FindTargetListEntryWithVarExprAttno(sourceTargetList,
sourceRepartitionVar->varattno);
if (sourceResultRepartitionColumnIndex == -1)
{
@ -1565,6 +1567,33 @@ FetchAndValidateInsertVarIfExists(Oid targetRelationId, Query *query)
}
/*
* FindTargetListEntryWithVarExprAttno finds the index of the target
* entry whose expr is a Var that points to input varattno.
*
* If no such target entry is found, it returns -1.
*/
static int
FindTargetListEntryWithVarExprAttno(List *targetList, AttrNumber varattno)
{
int targetEntryIndex = 0;
TargetEntry *targetEntry = NULL;
foreach_declared_ptr(targetEntry, targetList)
{
if (IsA(targetEntry->expr, Var) &&
((Var *) targetEntry->expr)->varattno == varattno)
{
return targetEntryIndex;
}
targetEntryIndex++;
}
return -1;
}
/*
* IsLocalTableModification returns true if the table modified is a Postgres table.
* We do not support recursive planning for MERGE yet, so we could have a join

View File

@ -193,13 +193,148 @@ SQL function "compare_data" statement 2
(1 row)
---- https://github.com/citusdata/citus/issues/8180 ----
CREATE TABLE dist_1 (a int, b int, c int);
CREATE TABLE dist_2 (a int, b int, c int);
CREATE TABLE dist_different_order_1 (b int, a int, c int);
SELECT create_distributed_table('dist_1', 'a');
create_distributed_table
---------------------------------------------------------------------
(1 row)
SELECT create_distributed_table('dist_2', 'a');
create_distributed_table
---------------------------------------------------------------------
(1 row)
SELECT create_distributed_table('dist_different_order_1', 'a');
create_distributed_table
---------------------------------------------------------------------
(1 row)
MERGE INTO dist_1
USING dist_2
ON (dist_1.a = dist_2.b)
WHEN MATCHED THEN UPDATE SET b = dist_2.b;
MERGE INTO dist_1
USING dist_1 src
ON (dist_1.a = src.b)
WHEN MATCHED THEN UPDATE SET b = src.b;
MERGE INTO dist_different_order_1
USING dist_1
ON (dist_different_order_1.a = dist_1.b)
WHEN MATCHED THEN UPDATE SET b = dist_1.b;
CREATE TABLE dist_1_cast (a int, b int);
CREATE TABLE dist_2_cast (a int, b numeric);
SELECT create_distributed_table('dist_1_cast', 'a');
create_distributed_table
---------------------------------------------------------------------
(1 row)
SELECT create_distributed_table('dist_2_cast', 'a');
create_distributed_table
---------------------------------------------------------------------
(1 row)
MERGE INTO dist_1_cast
USING dist_2_cast
ON (dist_1_cast.a = dist_2_cast.b)
WHEN MATCHED THEN UPDATE SET b = dist_2_cast.b;
ERROR: In the MERGE ON clause, there is a datatype mismatch between target's distribution column and the expression originating from the source.
DETAIL: If the types are different, Citus uses different hash functions for the two column types, which might lead to incorrect repartitioning of the result data
MERGE INTO dist_1_cast
USING (SELECT a, b::int as b FROM dist_2_cast) dist_2_cast
ON (dist_1_cast.a = dist_2_cast.b)
WHEN MATCHED THEN UPDATE SET b = dist_2_cast.b;
-- a more sophisticated example
CREATE TABLE dist_source (tstamp_col timestamp, int_col int, text_arr_col text[], text_col text, json_col jsonb);
CREATE TABLE dist_target (text_col text, tstamp_col timestamp, json_col jsonb, text_arr_col text[], int_col int);
CREATE TABLE local_source (tstamp_col timestamp, int_col int, text_arr_col text[], text_col text, json_col jsonb);
CREATE TABLE local_target (text_col text, tstamp_col timestamp, json_col jsonb, text_arr_col text[], int_col int);
SELECT create_distributed_table('dist_source', 'tstamp_col');
create_distributed_table
---------------------------------------------------------------------
(1 row)
SELECT create_distributed_table('dist_target', 'int_col');
create_distributed_table
---------------------------------------------------------------------
(1 row)
INSERT INTO dist_source (tstamp_col, int_col, text_arr_col, text_col, json_col)
SELECT TIMESTAMP '2025-01-01 00:00:00' + (i || ' days')::interval,
i,
ARRAY[i::text, (i+1)::text, (i+2)::text],
'source_' || i,
('{"a": ' || i || ', "b": ' || i+1 || '}')::jsonb
FROM generate_series(1001, 2000) i;
INSERT INTO dist_source (tstamp_col, int_col, text_arr_col, text_col, json_col)
SELECT TIMESTAMP '2025-01-01 00:00:00' + (i || ' days')::interval,
i,
ARRAY[i::text, (i+1)::text, (i+2)::text],
'source_' || i,
('{"a": ' || i || ', "b": ' || i+1 || '}')::jsonb
FROM generate_series(901, 1000) i;
INSERT INTO dist_target (tstamp_col, int_col, text_arr_col, text_col, json_col)
SELECT TIMESTAMP '2025-01-01 00:00:00' + (i || ' days')::interval,
i,
ARRAY[(i-1)::text, (i)::text, (i+1)::text],
'source_' || i,
('{"a": ' || i*5 || ', "b": ' || i+20 || '}')::jsonb
FROM generate_series(1501, 2000) i;
INSERT INTO dist_target (tstamp_col, int_col, text_arr_col, text_col, json_col)
SELECT TIMESTAMP '2025-01-01 00:00:00' + (i || ' days')::interval,
i,
ARRAY[(i-1)::text, (i)::text, (i+1)::text],
'source_' || i-1,
('{"a": ' || i*5 || ', "b": ' || i+20 || '}')::jsonb
FROM generate_series(1401, 1500) i;
INSERT INTO local_source SELECT * FROM dist_source;
INSERT INTO local_target SELECT * FROM dist_target;
-- execute the query on distributed tables
MERGE INTO dist_target target_alias
USING dist_source source_alias
ON (target_alias.text_col = source_alias.text_col AND target_alias.int_col = source_alias.int_col)
WHEN MATCHED THEN UPDATE SET
tstamp_col = source_alias.tstamp_col + interval '3 day',
text_arr_col = array_append(source_alias.text_arr_col, 'updated_' || source_alias.text_col),
json_col = ('{"a": "' || replace(source_alias.text_col, '"', '\"') || '"}')::jsonb,
text_col = source_alias.json_col->>'a'
WHEN NOT MATCHED THEN
INSERT VALUES (source_alias.text_col, source_alias.tstamp_col, source_alias.json_col, source_alias.text_arr_col, source_alias.int_col );
-- execute the same query on local tables, everything is the same except table names behind the aliases
MERGE INTO local_target target_alias
USING local_source source_alias
ON (target_alias.text_col = source_alias.text_col AND target_alias.int_col = source_alias.int_col)
WHEN MATCHED THEN UPDATE SET
tstamp_col = source_alias.tstamp_col + interval '3 day',
text_arr_col = array_append(source_alias.text_arr_col, 'updated_' || source_alias.text_col),
json_col = ('{"a": "' || replace(source_alias.text_col, '"', '\"') || '"}')::jsonb,
text_col = source_alias.json_col->>'a'
WHEN NOT MATCHED THEN
INSERT VALUES (source_alias.text_col, source_alias.tstamp_col, source_alias.json_col, source_alias.text_arr_col, source_alias.int_col );
-- compare both targets
SELECT COUNT(*) = 0 AS targets_match
FROM (
SELECT * FROM dist_target
EXCEPT
SELECT * FROM local_target
UNION ALL
SELECT * FROM local_target
EXCEPT
SELECT * FROM dist_target
) q;
targets_match
---------------------------------------------------------------------
t
(1 row)
SET client_min_messages TO WARNING;
DROP SCHEMA merge_repartition2_schema CASCADE;
NOTICE: drop cascades to 8 other objects
DETAIL: drop cascades to table pg_target
drop cascades to table pg_source
drop cascades to function cleanup_data()
drop cascades to function setup_data()
drop cascades to function check_data(text,text,text,text)
drop cascades to function compare_data()
drop cascades to table citus_target
drop cascades to table citus_source

View File

@ -126,5 +126,128 @@ WHEN NOT MATCHED THEN
SELECT compare_data();
DROP SCHEMA merge_repartition2_schema CASCADE;
---- https://github.com/citusdata/citus/issues/8180 ----
CREATE TABLE dist_1 (a int, b int, c int);
CREATE TABLE dist_2 (a int, b int, c int);
CREATE TABLE dist_different_order_1 (b int, a int, c int);
SELECT create_distributed_table('dist_1', 'a');
SELECT create_distributed_table('dist_2', 'a');
SELECT create_distributed_table('dist_different_order_1', 'a');
MERGE INTO dist_1
USING dist_2
ON (dist_1.a = dist_2.b)
WHEN MATCHED THEN UPDATE SET b = dist_2.b;
MERGE INTO dist_1
USING dist_1 src
ON (dist_1.a = src.b)
WHEN MATCHED THEN UPDATE SET b = src.b;
MERGE INTO dist_different_order_1
USING dist_1
ON (dist_different_order_1.a = dist_1.b)
WHEN MATCHED THEN UPDATE SET b = dist_1.b;
CREATE TABLE dist_1_cast (a int, b int);
CREATE TABLE dist_2_cast (a int, b numeric);
SELECT create_distributed_table('dist_1_cast', 'a');
SELECT create_distributed_table('dist_2_cast', 'a');
MERGE INTO dist_1_cast
USING dist_2_cast
ON (dist_1_cast.a = dist_2_cast.b)
WHEN MATCHED THEN UPDATE SET b = dist_2_cast.b;
MERGE INTO dist_1_cast
USING (SELECT a, b::int as b FROM dist_2_cast) dist_2_cast
ON (dist_1_cast.a = dist_2_cast.b)
WHEN MATCHED THEN UPDATE SET b = dist_2_cast.b;
-- a more sophisticated example
CREATE TABLE dist_source (tstamp_col timestamp, int_col int, text_arr_col text[], text_col text, json_col jsonb);
CREATE TABLE dist_target (text_col text, tstamp_col timestamp, json_col jsonb, text_arr_col text[], int_col int);
CREATE TABLE local_source (tstamp_col timestamp, int_col int, text_arr_col text[], text_col text, json_col jsonb);
CREATE TABLE local_target (text_col text, tstamp_col timestamp, json_col jsonb, text_arr_col text[], int_col int);
SELECT create_distributed_table('dist_source', 'tstamp_col');
SELECT create_distributed_table('dist_target', 'int_col');
INSERT INTO dist_source (tstamp_col, int_col, text_arr_col, text_col, json_col)
SELECT TIMESTAMP '2025-01-01 00:00:00' + (i || ' days')::interval,
i,
ARRAY[i::text, (i+1)::text, (i+2)::text],
'source_' || i,
('{"a": ' || i || ', "b": ' || i+1 || '}')::jsonb
FROM generate_series(1001, 2000) i;
INSERT INTO dist_source (tstamp_col, int_col, text_arr_col, text_col, json_col)
SELECT TIMESTAMP '2025-01-01 00:00:00' + (i || ' days')::interval,
i,
ARRAY[i::text, (i+1)::text, (i+2)::text],
'source_' || i,
('{"a": ' || i || ', "b": ' || i+1 || '}')::jsonb
FROM generate_series(901, 1000) i;
INSERT INTO dist_target (tstamp_col, int_col, text_arr_col, text_col, json_col)
SELECT TIMESTAMP '2025-01-01 00:00:00' + (i || ' days')::interval,
i,
ARRAY[(i-1)::text, (i)::text, (i+1)::text],
'source_' || i,
('{"a": ' || i*5 || ', "b": ' || i+20 || '}')::jsonb
FROM generate_series(1501, 2000) i;
INSERT INTO dist_target (tstamp_col, int_col, text_arr_col, text_col, json_col)
SELECT TIMESTAMP '2025-01-01 00:00:00' + (i || ' days')::interval,
i,
ARRAY[(i-1)::text, (i)::text, (i+1)::text],
'source_' || i-1,
('{"a": ' || i*5 || ', "b": ' || i+20 || '}')::jsonb
FROM generate_series(1401, 1500) i;
INSERT INTO local_source SELECT * FROM dist_source;
INSERT INTO local_target SELECT * FROM dist_target;
-- execute the query on distributed tables
MERGE INTO dist_target target_alias
USING dist_source source_alias
ON (target_alias.text_col = source_alias.text_col AND target_alias.int_col = source_alias.int_col)
WHEN MATCHED THEN UPDATE SET
tstamp_col = source_alias.tstamp_col + interval '3 day',
text_arr_col = array_append(source_alias.text_arr_col, 'updated_' || source_alias.text_col),
json_col = ('{"a": "' || replace(source_alias.text_col, '"', '\"') || '"}')::jsonb,
text_col = source_alias.json_col->>'a'
WHEN NOT MATCHED THEN
INSERT VALUES (source_alias.text_col, source_alias.tstamp_col, source_alias.json_col, source_alias.text_arr_col, source_alias.int_col );
-- execute the same query on local tables, everything is the same except table names behind the aliases
MERGE INTO local_target target_alias
USING local_source source_alias
ON (target_alias.text_col = source_alias.text_col AND target_alias.int_col = source_alias.int_col)
WHEN MATCHED THEN UPDATE SET
tstamp_col = source_alias.tstamp_col + interval '3 day',
text_arr_col = array_append(source_alias.text_arr_col, 'updated_' || source_alias.text_col),
json_col = ('{"a": "' || replace(source_alias.text_col, '"', '\"') || '"}')::jsonb,
text_col = source_alias.json_col->>'a'
WHEN NOT MATCHED THEN
INSERT VALUES (source_alias.text_col, source_alias.tstamp_col, source_alias.json_col, source_alias.text_arr_col, source_alias.int_col );
-- compare both targets
SELECT COUNT(*) = 0 AS targets_match
FROM (
SELECT * FROM dist_target
EXCEPT
SELECT * FROM local_target
UNION ALL
SELECT * FROM local_target
EXCEPT
SELECT * FROM dist_target
) q;
SET client_min_messages TO WARNING;
DROP SCHEMA merge_repartition2_schema CASCADE;