diff --git a/src/backend/distributed/planner/merge_planner.c b/src/backend/distributed/planner/merge_planner.c index e848141bc..566ac9c3b 100644 --- a/src/backend/distributed/planner/merge_planner.c +++ b/src/backend/distributed/planner/merge_planner.c @@ -41,6 +41,7 @@ static int SourceResultPartitionColumnIndex(Query *mergeQuery, List *sourceTargetList, CitusTableCacheEntry *targetRelation); +static int FindTargetListEntryWithVarExprAttno(List *targetList, AttrNumber varattno); static Var * ValidateAndReturnVarIfSupported(Node *entryExpr); static DeferredErrorMessage * DeferErrorIfTargetHasFalseClause(Oid targetRelationId, PlannerRestrictionContext * @@ -1414,7 +1415,8 @@ SourceResultPartitionColumnIndex(Query *mergeQuery, List *sourceTargetList, Assert(sourceRepartitionVar); int sourceResultRepartitionColumnIndex = - DistributionColumnIndex(sourceTargetList, sourceRepartitionVar); + FindTargetListEntryWithVarExprAttno(sourceTargetList, + sourceRepartitionVar->varattno); if (sourceResultRepartitionColumnIndex == -1) { @@ -1565,6 +1567,33 @@ FetchAndValidateInsertVarIfExists(Oid targetRelationId, Query *query) } +/* + * FindTargetListEntryWithVarExprAttno finds the index of the target + * entry whose expr is a Var that points to input varattno. + * + * If no such target entry is found, it returns -1. + */ +static int +FindTargetListEntryWithVarExprAttno(List *targetList, AttrNumber varattno) +{ + int targetEntryIndex = 0; + + TargetEntry *targetEntry = NULL; + foreach_declared_ptr(targetEntry, targetList) + { + if (IsA(targetEntry->expr, Var) && + ((Var *) targetEntry->expr)->varattno == varattno) + { + return targetEntryIndex; + } + + targetEntryIndex++; + } + + return -1; +} + + /* * IsLocalTableModification returns true if the table modified is a Postgres table. * We do not support recursive planning for MERGE yet, so we could have a join diff --git a/src/test/regress/expected/merge_repartition2.out b/src/test/regress/expected/merge_repartition2.out index 524ae84f7..99cb8fbba 100644 --- a/src/test/regress/expected/merge_repartition2.out +++ b/src/test/regress/expected/merge_repartition2.out @@ -193,13 +193,148 @@ SQL function "compare_data" statement 2 (1 row) +---- https://github.com/citusdata/citus/issues/8180 ---- +CREATE TABLE dist_1 (a int, b int, c int); +CREATE TABLE dist_2 (a int, b int, c int); +CREATE TABLE dist_different_order_1 (b int, a int, c int); +SELECT create_distributed_table('dist_1', 'a'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +SELECT create_distributed_table('dist_2', 'a'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +SELECT create_distributed_table('dist_different_order_1', 'a'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +MERGE INTO dist_1 +USING dist_2 +ON (dist_1.a = dist_2.b) +WHEN MATCHED THEN UPDATE SET b = dist_2.b; +MERGE INTO dist_1 +USING dist_1 src +ON (dist_1.a = src.b) +WHEN MATCHED THEN UPDATE SET b = src.b; +MERGE INTO dist_different_order_1 +USING dist_1 +ON (dist_different_order_1.a = dist_1.b) +WHEN MATCHED THEN UPDATE SET b = dist_1.b; +CREATE TABLE dist_1_cast (a int, b int); +CREATE TABLE dist_2_cast (a int, b numeric); +SELECT create_distributed_table('dist_1_cast', 'a'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +SELECT create_distributed_table('dist_2_cast', 'a'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +MERGE INTO dist_1_cast +USING dist_2_cast +ON (dist_1_cast.a = dist_2_cast.b) +WHEN MATCHED THEN UPDATE SET b = dist_2_cast.b; +ERROR: In the MERGE ON clause, there is a datatype mismatch between target's distribution column and the expression originating from the source. +DETAIL: If the types are different, Citus uses different hash functions for the two column types, which might lead to incorrect repartitioning of the result data +MERGE INTO dist_1_cast +USING (SELECT a, b::int as b FROM dist_2_cast) dist_2_cast +ON (dist_1_cast.a = dist_2_cast.b) +WHEN MATCHED THEN UPDATE SET b = dist_2_cast.b; +-- a more sophisticated example +CREATE TABLE dist_source (tstamp_col timestamp, int_col int, text_arr_col text[], text_col text, json_col jsonb); +CREATE TABLE dist_target (text_col text, tstamp_col timestamp, json_col jsonb, text_arr_col text[], int_col int); +CREATE TABLE local_source (tstamp_col timestamp, int_col int, text_arr_col text[], text_col text, json_col jsonb); +CREATE TABLE local_target (text_col text, tstamp_col timestamp, json_col jsonb, text_arr_col text[], int_col int); +SELECT create_distributed_table('dist_source', 'tstamp_col'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +SELECT create_distributed_table('dist_target', 'int_col'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +INSERT INTO dist_source (tstamp_col, int_col, text_arr_col, text_col, json_col) +SELECT TIMESTAMP '2025-01-01 00:00:00' + (i || ' days')::interval, + i, + ARRAY[i::text, (i+1)::text, (i+2)::text], + 'source_' || i, + ('{"a": ' || i || ', "b": ' || i+1 || '}')::jsonb +FROM generate_series(1001, 2000) i; +INSERT INTO dist_source (tstamp_col, int_col, text_arr_col, text_col, json_col) +SELECT TIMESTAMP '2025-01-01 00:00:00' + (i || ' days')::interval, + i, + ARRAY[i::text, (i+1)::text, (i+2)::text], + 'source_' || i, + ('{"a": ' || i || ', "b": ' || i+1 || '}')::jsonb +FROM generate_series(901, 1000) i; +INSERT INTO dist_target (tstamp_col, int_col, text_arr_col, text_col, json_col) +SELECT TIMESTAMP '2025-01-01 00:00:00' + (i || ' days')::interval, + i, + ARRAY[(i-1)::text, (i)::text, (i+1)::text], + 'source_' || i, + ('{"a": ' || i*5 || ', "b": ' || i+20 || '}')::jsonb +FROM generate_series(1501, 2000) i; +INSERT INTO dist_target (tstamp_col, int_col, text_arr_col, text_col, json_col) +SELECT TIMESTAMP '2025-01-01 00:00:00' + (i || ' days')::interval, + i, + ARRAY[(i-1)::text, (i)::text, (i+1)::text], + 'source_' || i-1, + ('{"a": ' || i*5 || ', "b": ' || i+20 || '}')::jsonb +FROM generate_series(1401, 1500) i; +INSERT INTO local_source SELECT * FROM dist_source; +INSERT INTO local_target SELECT * FROM dist_target; +-- execute the query on distributed tables +MERGE INTO dist_target target_alias +USING dist_source source_alias +ON (target_alias.text_col = source_alias.text_col AND target_alias.int_col = source_alias.int_col) +WHEN MATCHED THEN UPDATE SET + tstamp_col = source_alias.tstamp_col + interval '3 day', + text_arr_col = array_append(source_alias.text_arr_col, 'updated_' || source_alias.text_col), + json_col = ('{"a": "' || replace(source_alias.text_col, '"', '\"') || '"}')::jsonb, + text_col = source_alias.json_col->>'a' +WHEN NOT MATCHED THEN + INSERT VALUES (source_alias.text_col, source_alias.tstamp_col, source_alias.json_col, source_alias.text_arr_col, source_alias.int_col ); +-- execute the same query on local tables, everything is the same except table names behind the aliases +MERGE INTO local_target target_alias +USING local_source source_alias +ON (target_alias.text_col = source_alias.text_col AND target_alias.int_col = source_alias.int_col) +WHEN MATCHED THEN UPDATE SET + tstamp_col = source_alias.tstamp_col + interval '3 day', + text_arr_col = array_append(source_alias.text_arr_col, 'updated_' || source_alias.text_col), + json_col = ('{"a": "' || replace(source_alias.text_col, '"', '\"') || '"}')::jsonb, + text_col = source_alias.json_col->>'a' +WHEN NOT MATCHED THEN + INSERT VALUES (source_alias.text_col, source_alias.tstamp_col, source_alias.json_col, source_alias.text_arr_col, source_alias.int_col ); +-- compare both targets +SELECT COUNT(*) = 0 AS targets_match +FROM ( + SELECT * FROM dist_target + EXCEPT + SELECT * FROM local_target + UNION ALL + SELECT * FROM local_target + EXCEPT + SELECT * FROM dist_target +) q; + targets_match +--------------------------------------------------------------------- + t +(1 row) + +SET client_min_messages TO WARNING; DROP SCHEMA merge_repartition2_schema CASCADE; -NOTICE: drop cascades to 8 other objects -DETAIL: drop cascades to table pg_target -drop cascades to table pg_source -drop cascades to function cleanup_data() -drop cascades to function setup_data() -drop cascades to function check_data(text,text,text,text) -drop cascades to function compare_data() -drop cascades to table citus_target -drop cascades to table citus_source diff --git a/src/test/regress/sql/merge_repartition2.sql b/src/test/regress/sql/merge_repartition2.sql index 354f0605b..6da816bb5 100644 --- a/src/test/regress/sql/merge_repartition2.sql +++ b/src/test/regress/sql/merge_repartition2.sql @@ -126,5 +126,128 @@ WHEN NOT MATCHED THEN SELECT compare_data(); -DROP SCHEMA merge_repartition2_schema CASCADE; +---- https://github.com/citusdata/citus/issues/8180 ---- +CREATE TABLE dist_1 (a int, b int, c int); +CREATE TABLE dist_2 (a int, b int, c int); +CREATE TABLE dist_different_order_1 (b int, a int, c int); + +SELECT create_distributed_table('dist_1', 'a'); +SELECT create_distributed_table('dist_2', 'a'); +SELECT create_distributed_table('dist_different_order_1', 'a'); + +MERGE INTO dist_1 +USING dist_2 +ON (dist_1.a = dist_2.b) +WHEN MATCHED THEN UPDATE SET b = dist_2.b; + +MERGE INTO dist_1 +USING dist_1 src +ON (dist_1.a = src.b) +WHEN MATCHED THEN UPDATE SET b = src.b; + +MERGE INTO dist_different_order_1 +USING dist_1 +ON (dist_different_order_1.a = dist_1.b) +WHEN MATCHED THEN UPDATE SET b = dist_1.b; + +CREATE TABLE dist_1_cast (a int, b int); +CREATE TABLE dist_2_cast (a int, b numeric); + +SELECT create_distributed_table('dist_1_cast', 'a'); +SELECT create_distributed_table('dist_2_cast', 'a'); + +MERGE INTO dist_1_cast +USING dist_2_cast +ON (dist_1_cast.a = dist_2_cast.b) +WHEN MATCHED THEN UPDATE SET b = dist_2_cast.b; + +MERGE INTO dist_1_cast +USING (SELECT a, b::int as b FROM dist_2_cast) dist_2_cast +ON (dist_1_cast.a = dist_2_cast.b) +WHEN MATCHED THEN UPDATE SET b = dist_2_cast.b; + +-- a more sophisticated example +CREATE TABLE dist_source (tstamp_col timestamp, int_col int, text_arr_col text[], text_col text, json_col jsonb); +CREATE TABLE dist_target (text_col text, tstamp_col timestamp, json_col jsonb, text_arr_col text[], int_col int); + +CREATE TABLE local_source (tstamp_col timestamp, int_col int, text_arr_col text[], text_col text, json_col jsonb); +CREATE TABLE local_target (text_col text, tstamp_col timestamp, json_col jsonb, text_arr_col text[], int_col int); + +SELECT create_distributed_table('dist_source', 'tstamp_col'); +SELECT create_distributed_table('dist_target', 'int_col'); + +INSERT INTO dist_source (tstamp_col, int_col, text_arr_col, text_col, json_col) +SELECT TIMESTAMP '2025-01-01 00:00:00' + (i || ' days')::interval, + i, + ARRAY[i::text, (i+1)::text, (i+2)::text], + 'source_' || i, + ('{"a": ' || i || ', "b": ' || i+1 || '}')::jsonb +FROM generate_series(1001, 2000) i; + +INSERT INTO dist_source (tstamp_col, int_col, text_arr_col, text_col, json_col) +SELECT TIMESTAMP '2025-01-01 00:00:00' + (i || ' days')::interval, + i, + ARRAY[i::text, (i+1)::text, (i+2)::text], + 'source_' || i, + ('{"a": ' || i || ', "b": ' || i+1 || '}')::jsonb +FROM generate_series(901, 1000) i; + +INSERT INTO dist_target (tstamp_col, int_col, text_arr_col, text_col, json_col) +SELECT TIMESTAMP '2025-01-01 00:00:00' + (i || ' days')::interval, + i, + ARRAY[(i-1)::text, (i)::text, (i+1)::text], + 'source_' || i, + ('{"a": ' || i*5 || ', "b": ' || i+20 || '}')::jsonb +FROM generate_series(1501, 2000) i; + +INSERT INTO dist_target (tstamp_col, int_col, text_arr_col, text_col, json_col) +SELECT TIMESTAMP '2025-01-01 00:00:00' + (i || ' days')::interval, + i, + ARRAY[(i-1)::text, (i)::text, (i+1)::text], + 'source_' || i-1, + ('{"a": ' || i*5 || ', "b": ' || i+20 || '}')::jsonb +FROM generate_series(1401, 1500) i; + +INSERT INTO local_source SELECT * FROM dist_source; +INSERT INTO local_target SELECT * FROM dist_target; + +-- execute the query on distributed tables +MERGE INTO dist_target target_alias +USING dist_source source_alias +ON (target_alias.text_col = source_alias.text_col AND target_alias.int_col = source_alias.int_col) +WHEN MATCHED THEN UPDATE SET + tstamp_col = source_alias.tstamp_col + interval '3 day', + text_arr_col = array_append(source_alias.text_arr_col, 'updated_' || source_alias.text_col), + json_col = ('{"a": "' || replace(source_alias.text_col, '"', '\"') || '"}')::jsonb, + text_col = source_alias.json_col->>'a' +WHEN NOT MATCHED THEN + INSERT VALUES (source_alias.text_col, source_alias.tstamp_col, source_alias.json_col, source_alias.text_arr_col, source_alias.int_col ); + +-- execute the same query on local tables, everything is the same except table names behind the aliases +MERGE INTO local_target target_alias +USING local_source source_alias +ON (target_alias.text_col = source_alias.text_col AND target_alias.int_col = source_alias.int_col) +WHEN MATCHED THEN UPDATE SET + tstamp_col = source_alias.tstamp_col + interval '3 day', + text_arr_col = array_append(source_alias.text_arr_col, 'updated_' || source_alias.text_col), + json_col = ('{"a": "' || replace(source_alias.text_col, '"', '\"') || '"}')::jsonb, + text_col = source_alias.json_col->>'a' +WHEN NOT MATCHED THEN + INSERT VALUES (source_alias.text_col, source_alias.tstamp_col, source_alias.json_col, source_alias.text_arr_col, source_alias.int_col ); + +-- compare both targets + +SELECT COUNT(*) = 0 AS targets_match +FROM ( + SELECT * FROM dist_target + EXCEPT + SELECT * FROM local_target + UNION ALL + SELECT * FROM local_target + EXCEPT + SELECT * FROM dist_target +) q; + +SET client_min_messages TO WARNING; +DROP SCHEMA merge_repartition2_schema CASCADE;