mirror of https://github.com/citusdata/citus.git
Fix unexpected column index error for repartitioned merge (#8201)
DESCRIPTION: Fixes a bug that causes an unexpected error when executing
repartitioned merge.
Fixes #8180.
This was happening because of a bug in
SourceResultPartitionColumnIndex(). And to fix it, this PR avoids
using DistributionColumnIndex() in SourceResultPartitionColumnIndex().
Instead, invents FindTargetListEntryWithVarExprAttno(), which finds
the index of the target entry in the source query's target list that
can be used to repartition the source for a repartitioned merge. In
short, to find the source target entry that refences the Var used in
ON (..) clause and that references the source rte, we should check the
varattno of the underlying expr, which presumably is always a Var for
repartitioned merge as we always wrap the source rte with a subquery,
where all target entries point to the columns of the original source
relation.
Using DistributionColumnIndex() prior to 13.0 wasn't causing such an
issue because prior to 13.0, the varattno of the underlying expr of
the source target entries was almost (*1) always equal to resno of the
target entry as we were including all target entries of the source
relation. However, starting with #7659, which is merged to main before
13.0, we started using CreateFilteredTargetListForRelation() instead of
CreateAllTargetListForRelation() to compute the target entry list for
the source rte to fix another bug. So we cannot revert to using
CreateAllTargetListForRelation() because otherwise we would re-introduce
bug that it helped fixing, so we instead had to find a way to properly
deal with the "filtered target list"s, as in this commit. Plus (*1),
even before #7659, probably we would still fail when the source relation
has dropped attributes or such because that would probably also cause
such a mismatch between the varattno of the underlying expr of the
target entry and its resno.
(cherry picked from commit 83b25e1fb1)
release-13.2-ihalatci-8302
parent
e8e06d8d0c
commit
490884176a
|
|
@ -41,6 +41,7 @@
|
||||||
static int SourceResultPartitionColumnIndex(Query *mergeQuery,
|
static int SourceResultPartitionColumnIndex(Query *mergeQuery,
|
||||||
List *sourceTargetList,
|
List *sourceTargetList,
|
||||||
CitusTableCacheEntry *targetRelation);
|
CitusTableCacheEntry *targetRelation);
|
||||||
|
static int FindTargetListEntryWithVarExprAttno(List *targetList, AttrNumber varattno);
|
||||||
static Var * ValidateAndReturnVarIfSupported(Node *entryExpr);
|
static Var * ValidateAndReturnVarIfSupported(Node *entryExpr);
|
||||||
static DeferredErrorMessage * DeferErrorIfTargetHasFalseClause(Oid targetRelationId,
|
static DeferredErrorMessage * DeferErrorIfTargetHasFalseClause(Oid targetRelationId,
|
||||||
PlannerRestrictionContext *
|
PlannerRestrictionContext *
|
||||||
|
|
@ -1411,7 +1412,8 @@ SourceResultPartitionColumnIndex(Query *mergeQuery, List *sourceTargetList,
|
||||||
Assert(sourceRepartitionVar);
|
Assert(sourceRepartitionVar);
|
||||||
|
|
||||||
int sourceResultRepartitionColumnIndex =
|
int sourceResultRepartitionColumnIndex =
|
||||||
DistributionColumnIndex(sourceTargetList, sourceRepartitionVar);
|
FindTargetListEntryWithVarExprAttno(sourceTargetList,
|
||||||
|
sourceRepartitionVar->varattno);
|
||||||
|
|
||||||
if (sourceResultRepartitionColumnIndex == -1)
|
if (sourceResultRepartitionColumnIndex == -1)
|
||||||
{
|
{
|
||||||
|
|
@ -1562,6 +1564,33 @@ FetchAndValidateInsertVarIfExists(Oid targetRelationId, Query *query)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* FindTargetListEntryWithVarExprAttno finds the index of the target
|
||||||
|
* entry whose expr is a Var that points to input varattno.
|
||||||
|
*
|
||||||
|
* If no such target entry is found, it returns -1.
|
||||||
|
*/
|
||||||
|
static int
|
||||||
|
FindTargetListEntryWithVarExprAttno(List *targetList, AttrNumber varattno)
|
||||||
|
{
|
||||||
|
int targetEntryIndex = 0;
|
||||||
|
|
||||||
|
TargetEntry *targetEntry = NULL;
|
||||||
|
foreach_declared_ptr(targetEntry, targetList)
|
||||||
|
{
|
||||||
|
if (IsA(targetEntry->expr, Var) &&
|
||||||
|
((Var *) targetEntry->expr)->varattno == varattno)
|
||||||
|
{
|
||||||
|
return targetEntryIndex;
|
||||||
|
}
|
||||||
|
|
||||||
|
targetEntryIndex++;
|
||||||
|
}
|
||||||
|
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* IsLocalTableModification returns true if the table modified is a Postgres table.
|
* IsLocalTableModification returns true if the table modified is a Postgres table.
|
||||||
* We do not support recursive planning for MERGE yet, so we could have a join
|
* We do not support recursive planning for MERGE yet, so we could have a join
|
||||||
|
|
|
||||||
|
|
@ -193,13 +193,148 @@ SQL function "compare_data" statement 2
|
||||||
|
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
|
---- https://github.com/citusdata/citus/issues/8180 ----
|
||||||
|
CREATE TABLE dist_1 (a int, b int, c int);
|
||||||
|
CREATE TABLE dist_2 (a int, b int, c int);
|
||||||
|
CREATE TABLE dist_different_order_1 (b int, a int, c int);
|
||||||
|
SELECT create_distributed_table('dist_1', 'a');
|
||||||
|
create_distributed_table
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT create_distributed_table('dist_2', 'a');
|
||||||
|
create_distributed_table
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT create_distributed_table('dist_different_order_1', 'a');
|
||||||
|
create_distributed_table
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
MERGE INTO dist_1
|
||||||
|
USING dist_2
|
||||||
|
ON (dist_1.a = dist_2.b)
|
||||||
|
WHEN MATCHED THEN UPDATE SET b = dist_2.b;
|
||||||
|
MERGE INTO dist_1
|
||||||
|
USING dist_1 src
|
||||||
|
ON (dist_1.a = src.b)
|
||||||
|
WHEN MATCHED THEN UPDATE SET b = src.b;
|
||||||
|
MERGE INTO dist_different_order_1
|
||||||
|
USING dist_1
|
||||||
|
ON (dist_different_order_1.a = dist_1.b)
|
||||||
|
WHEN MATCHED THEN UPDATE SET b = dist_1.b;
|
||||||
|
CREATE TABLE dist_1_cast (a int, b int);
|
||||||
|
CREATE TABLE dist_2_cast (a int, b numeric);
|
||||||
|
SELECT create_distributed_table('dist_1_cast', 'a');
|
||||||
|
create_distributed_table
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT create_distributed_table('dist_2_cast', 'a');
|
||||||
|
create_distributed_table
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
MERGE INTO dist_1_cast
|
||||||
|
USING dist_2_cast
|
||||||
|
ON (dist_1_cast.a = dist_2_cast.b)
|
||||||
|
WHEN MATCHED THEN UPDATE SET b = dist_2_cast.b;
|
||||||
|
ERROR: In the MERGE ON clause, there is a datatype mismatch between target's distribution column and the expression originating from the source.
|
||||||
|
DETAIL: If the types are different, Citus uses different hash functions for the two column types, which might lead to incorrect repartitioning of the result data
|
||||||
|
MERGE INTO dist_1_cast
|
||||||
|
USING (SELECT a, b::int as b FROM dist_2_cast) dist_2_cast
|
||||||
|
ON (dist_1_cast.a = dist_2_cast.b)
|
||||||
|
WHEN MATCHED THEN UPDATE SET b = dist_2_cast.b;
|
||||||
|
-- a more sophisticated example
|
||||||
|
CREATE TABLE dist_source (tstamp_col timestamp, int_col int, text_arr_col text[], text_col text, json_col jsonb);
|
||||||
|
CREATE TABLE dist_target (text_col text, tstamp_col timestamp, json_col jsonb, text_arr_col text[], int_col int);
|
||||||
|
CREATE TABLE local_source (tstamp_col timestamp, int_col int, text_arr_col text[], text_col text, json_col jsonb);
|
||||||
|
CREATE TABLE local_target (text_col text, tstamp_col timestamp, json_col jsonb, text_arr_col text[], int_col int);
|
||||||
|
SELECT create_distributed_table('dist_source', 'tstamp_col');
|
||||||
|
create_distributed_table
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT create_distributed_table('dist_target', 'int_col');
|
||||||
|
create_distributed_table
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
INSERT INTO dist_source (tstamp_col, int_col, text_arr_col, text_col, json_col)
|
||||||
|
SELECT TIMESTAMP '2025-01-01 00:00:00' + (i || ' days')::interval,
|
||||||
|
i,
|
||||||
|
ARRAY[i::text, (i+1)::text, (i+2)::text],
|
||||||
|
'source_' || i,
|
||||||
|
('{"a": ' || i || ', "b": ' || i+1 || '}')::jsonb
|
||||||
|
FROM generate_series(1001, 2000) i;
|
||||||
|
INSERT INTO dist_source (tstamp_col, int_col, text_arr_col, text_col, json_col)
|
||||||
|
SELECT TIMESTAMP '2025-01-01 00:00:00' + (i || ' days')::interval,
|
||||||
|
i,
|
||||||
|
ARRAY[i::text, (i+1)::text, (i+2)::text],
|
||||||
|
'source_' || i,
|
||||||
|
('{"a": ' || i || ', "b": ' || i+1 || '}')::jsonb
|
||||||
|
FROM generate_series(901, 1000) i;
|
||||||
|
INSERT INTO dist_target (tstamp_col, int_col, text_arr_col, text_col, json_col)
|
||||||
|
SELECT TIMESTAMP '2025-01-01 00:00:00' + (i || ' days')::interval,
|
||||||
|
i,
|
||||||
|
ARRAY[(i-1)::text, (i)::text, (i+1)::text],
|
||||||
|
'source_' || i,
|
||||||
|
('{"a": ' || i*5 || ', "b": ' || i+20 || '}')::jsonb
|
||||||
|
FROM generate_series(1501, 2000) i;
|
||||||
|
INSERT INTO dist_target (tstamp_col, int_col, text_arr_col, text_col, json_col)
|
||||||
|
SELECT TIMESTAMP '2025-01-01 00:00:00' + (i || ' days')::interval,
|
||||||
|
i,
|
||||||
|
ARRAY[(i-1)::text, (i)::text, (i+1)::text],
|
||||||
|
'source_' || i-1,
|
||||||
|
('{"a": ' || i*5 || ', "b": ' || i+20 || '}')::jsonb
|
||||||
|
FROM generate_series(1401, 1500) i;
|
||||||
|
INSERT INTO local_source SELECT * FROM dist_source;
|
||||||
|
INSERT INTO local_target SELECT * FROM dist_target;
|
||||||
|
-- execute the query on distributed tables
|
||||||
|
MERGE INTO dist_target target_alias
|
||||||
|
USING dist_source source_alias
|
||||||
|
ON (target_alias.text_col = source_alias.text_col AND target_alias.int_col = source_alias.int_col)
|
||||||
|
WHEN MATCHED THEN UPDATE SET
|
||||||
|
tstamp_col = source_alias.tstamp_col + interval '3 day',
|
||||||
|
text_arr_col = array_append(source_alias.text_arr_col, 'updated_' || source_alias.text_col),
|
||||||
|
json_col = ('{"a": "' || replace(source_alias.text_col, '"', '\"') || '"}')::jsonb,
|
||||||
|
text_col = source_alias.json_col->>'a'
|
||||||
|
WHEN NOT MATCHED THEN
|
||||||
|
INSERT VALUES (source_alias.text_col, source_alias.tstamp_col, source_alias.json_col, source_alias.text_arr_col, source_alias.int_col );
|
||||||
|
-- execute the same query on local tables, everything is the same except table names behind the aliases
|
||||||
|
MERGE INTO local_target target_alias
|
||||||
|
USING local_source source_alias
|
||||||
|
ON (target_alias.text_col = source_alias.text_col AND target_alias.int_col = source_alias.int_col)
|
||||||
|
WHEN MATCHED THEN UPDATE SET
|
||||||
|
tstamp_col = source_alias.tstamp_col + interval '3 day',
|
||||||
|
text_arr_col = array_append(source_alias.text_arr_col, 'updated_' || source_alias.text_col),
|
||||||
|
json_col = ('{"a": "' || replace(source_alias.text_col, '"', '\"') || '"}')::jsonb,
|
||||||
|
text_col = source_alias.json_col->>'a'
|
||||||
|
WHEN NOT MATCHED THEN
|
||||||
|
INSERT VALUES (source_alias.text_col, source_alias.tstamp_col, source_alias.json_col, source_alias.text_arr_col, source_alias.int_col );
|
||||||
|
-- compare both targets
|
||||||
|
SELECT COUNT(*) = 0 AS targets_match
|
||||||
|
FROM (
|
||||||
|
SELECT * FROM dist_target
|
||||||
|
EXCEPT
|
||||||
|
SELECT * FROM local_target
|
||||||
|
UNION ALL
|
||||||
|
SELECT * FROM local_target
|
||||||
|
EXCEPT
|
||||||
|
SELECT * FROM dist_target
|
||||||
|
) q;
|
||||||
|
targets_match
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
t
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SET client_min_messages TO WARNING;
|
||||||
DROP SCHEMA merge_repartition2_schema CASCADE;
|
DROP SCHEMA merge_repartition2_schema CASCADE;
|
||||||
NOTICE: drop cascades to 8 other objects
|
|
||||||
DETAIL: drop cascades to table pg_target
|
|
||||||
drop cascades to table pg_source
|
|
||||||
drop cascades to function cleanup_data()
|
|
||||||
drop cascades to function setup_data()
|
|
||||||
drop cascades to function check_data(text,text,text,text)
|
|
||||||
drop cascades to function compare_data()
|
|
||||||
drop cascades to table citus_target
|
|
||||||
drop cascades to table citus_source
|
|
||||||
|
|
|
||||||
|
|
@ -126,5 +126,128 @@ WHEN NOT MATCHED THEN
|
||||||
|
|
||||||
SELECT compare_data();
|
SELECT compare_data();
|
||||||
|
|
||||||
DROP SCHEMA merge_repartition2_schema CASCADE;
|
---- https://github.com/citusdata/citus/issues/8180 ----
|
||||||
|
|
||||||
|
CREATE TABLE dist_1 (a int, b int, c int);
|
||||||
|
CREATE TABLE dist_2 (a int, b int, c int);
|
||||||
|
CREATE TABLE dist_different_order_1 (b int, a int, c int);
|
||||||
|
|
||||||
|
SELECT create_distributed_table('dist_1', 'a');
|
||||||
|
SELECT create_distributed_table('dist_2', 'a');
|
||||||
|
SELECT create_distributed_table('dist_different_order_1', 'a');
|
||||||
|
|
||||||
|
MERGE INTO dist_1
|
||||||
|
USING dist_2
|
||||||
|
ON (dist_1.a = dist_2.b)
|
||||||
|
WHEN MATCHED THEN UPDATE SET b = dist_2.b;
|
||||||
|
|
||||||
|
MERGE INTO dist_1
|
||||||
|
USING dist_1 src
|
||||||
|
ON (dist_1.a = src.b)
|
||||||
|
WHEN MATCHED THEN UPDATE SET b = src.b;
|
||||||
|
|
||||||
|
MERGE INTO dist_different_order_1
|
||||||
|
USING dist_1
|
||||||
|
ON (dist_different_order_1.a = dist_1.b)
|
||||||
|
WHEN MATCHED THEN UPDATE SET b = dist_1.b;
|
||||||
|
|
||||||
|
CREATE TABLE dist_1_cast (a int, b int);
|
||||||
|
CREATE TABLE dist_2_cast (a int, b numeric);
|
||||||
|
|
||||||
|
SELECT create_distributed_table('dist_1_cast', 'a');
|
||||||
|
SELECT create_distributed_table('dist_2_cast', 'a');
|
||||||
|
|
||||||
|
MERGE INTO dist_1_cast
|
||||||
|
USING dist_2_cast
|
||||||
|
ON (dist_1_cast.a = dist_2_cast.b)
|
||||||
|
WHEN MATCHED THEN UPDATE SET b = dist_2_cast.b;
|
||||||
|
|
||||||
|
MERGE INTO dist_1_cast
|
||||||
|
USING (SELECT a, b::int as b FROM dist_2_cast) dist_2_cast
|
||||||
|
ON (dist_1_cast.a = dist_2_cast.b)
|
||||||
|
WHEN MATCHED THEN UPDATE SET b = dist_2_cast.b;
|
||||||
|
|
||||||
|
-- a more sophisticated example
|
||||||
|
CREATE TABLE dist_source (tstamp_col timestamp, int_col int, text_arr_col text[], text_col text, json_col jsonb);
|
||||||
|
CREATE TABLE dist_target (text_col text, tstamp_col timestamp, json_col jsonb, text_arr_col text[], int_col int);
|
||||||
|
|
||||||
|
CREATE TABLE local_source (tstamp_col timestamp, int_col int, text_arr_col text[], text_col text, json_col jsonb);
|
||||||
|
CREATE TABLE local_target (text_col text, tstamp_col timestamp, json_col jsonb, text_arr_col text[], int_col int);
|
||||||
|
|
||||||
|
SELECT create_distributed_table('dist_source', 'tstamp_col');
|
||||||
|
SELECT create_distributed_table('dist_target', 'int_col');
|
||||||
|
|
||||||
|
INSERT INTO dist_source (tstamp_col, int_col, text_arr_col, text_col, json_col)
|
||||||
|
SELECT TIMESTAMP '2025-01-01 00:00:00' + (i || ' days')::interval,
|
||||||
|
i,
|
||||||
|
ARRAY[i::text, (i+1)::text, (i+2)::text],
|
||||||
|
'source_' || i,
|
||||||
|
('{"a": ' || i || ', "b": ' || i+1 || '}')::jsonb
|
||||||
|
FROM generate_series(1001, 2000) i;
|
||||||
|
|
||||||
|
INSERT INTO dist_source (tstamp_col, int_col, text_arr_col, text_col, json_col)
|
||||||
|
SELECT TIMESTAMP '2025-01-01 00:00:00' + (i || ' days')::interval,
|
||||||
|
i,
|
||||||
|
ARRAY[i::text, (i+1)::text, (i+2)::text],
|
||||||
|
'source_' || i,
|
||||||
|
('{"a": ' || i || ', "b": ' || i+1 || '}')::jsonb
|
||||||
|
FROM generate_series(901, 1000) i;
|
||||||
|
|
||||||
|
INSERT INTO dist_target (tstamp_col, int_col, text_arr_col, text_col, json_col)
|
||||||
|
SELECT TIMESTAMP '2025-01-01 00:00:00' + (i || ' days')::interval,
|
||||||
|
i,
|
||||||
|
ARRAY[(i-1)::text, (i)::text, (i+1)::text],
|
||||||
|
'source_' || i,
|
||||||
|
('{"a": ' || i*5 || ', "b": ' || i+20 || '}')::jsonb
|
||||||
|
FROM generate_series(1501, 2000) i;
|
||||||
|
|
||||||
|
INSERT INTO dist_target (tstamp_col, int_col, text_arr_col, text_col, json_col)
|
||||||
|
SELECT TIMESTAMP '2025-01-01 00:00:00' + (i || ' days')::interval,
|
||||||
|
i,
|
||||||
|
ARRAY[(i-1)::text, (i)::text, (i+1)::text],
|
||||||
|
'source_' || i-1,
|
||||||
|
('{"a": ' || i*5 || ', "b": ' || i+20 || '}')::jsonb
|
||||||
|
FROM generate_series(1401, 1500) i;
|
||||||
|
|
||||||
|
INSERT INTO local_source SELECT * FROM dist_source;
|
||||||
|
INSERT INTO local_target SELECT * FROM dist_target;
|
||||||
|
|
||||||
|
-- execute the query on distributed tables
|
||||||
|
MERGE INTO dist_target target_alias
|
||||||
|
USING dist_source source_alias
|
||||||
|
ON (target_alias.text_col = source_alias.text_col AND target_alias.int_col = source_alias.int_col)
|
||||||
|
WHEN MATCHED THEN UPDATE SET
|
||||||
|
tstamp_col = source_alias.tstamp_col + interval '3 day',
|
||||||
|
text_arr_col = array_append(source_alias.text_arr_col, 'updated_' || source_alias.text_col),
|
||||||
|
json_col = ('{"a": "' || replace(source_alias.text_col, '"', '\"') || '"}')::jsonb,
|
||||||
|
text_col = source_alias.json_col->>'a'
|
||||||
|
WHEN NOT MATCHED THEN
|
||||||
|
INSERT VALUES (source_alias.text_col, source_alias.tstamp_col, source_alias.json_col, source_alias.text_arr_col, source_alias.int_col );
|
||||||
|
|
||||||
|
-- execute the same query on local tables, everything is the same except table names behind the aliases
|
||||||
|
MERGE INTO local_target target_alias
|
||||||
|
USING local_source source_alias
|
||||||
|
ON (target_alias.text_col = source_alias.text_col AND target_alias.int_col = source_alias.int_col)
|
||||||
|
WHEN MATCHED THEN UPDATE SET
|
||||||
|
tstamp_col = source_alias.tstamp_col + interval '3 day',
|
||||||
|
text_arr_col = array_append(source_alias.text_arr_col, 'updated_' || source_alias.text_col),
|
||||||
|
json_col = ('{"a": "' || replace(source_alias.text_col, '"', '\"') || '"}')::jsonb,
|
||||||
|
text_col = source_alias.json_col->>'a'
|
||||||
|
WHEN NOT MATCHED THEN
|
||||||
|
INSERT VALUES (source_alias.text_col, source_alias.tstamp_col, source_alias.json_col, source_alias.text_arr_col, source_alias.int_col );
|
||||||
|
|
||||||
|
-- compare both targets
|
||||||
|
|
||||||
|
SELECT COUNT(*) = 0 AS targets_match
|
||||||
|
FROM (
|
||||||
|
SELECT * FROM dist_target
|
||||||
|
EXCEPT
|
||||||
|
SELECT * FROM local_target
|
||||||
|
UNION ALL
|
||||||
|
SELECT * FROM local_target
|
||||||
|
EXCEPT
|
||||||
|
SELECT * FROM dist_target
|
||||||
|
) q;
|
||||||
|
|
||||||
|
SET client_min_messages TO WARNING;
|
||||||
|
DROP SCHEMA merge_repartition2_schema CASCADE;
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue