diff --git a/src/backend/distributed/executor/insert_select_executor.c b/src/backend/distributed/executor/insert_select_executor.c index 311567469..49c0db137 100644 --- a/src/backend/distributed/executor/insert_select_executor.c +++ b/src/backend/distributed/executor/insert_select_executor.c @@ -67,15 +67,17 @@ static HTAB * ExecutePlanIntoColocatedIntermediateResults(Oid targetRelationId, static List * BuildColumnNameListFromTargetList(Oid targetRelationId, List *insertTargetList); static int PartitionColumnIndexFromColumnList(Oid relationId, List *columnNameList); -static void AddInsertSelectCasts(List *targetList, TupleDesc destTupleDescriptor); +static List * AddInsertSelectCasts(List *insertTargetList, List *selectTargetList, + Oid targetRelationId); static bool IsSupportedRedistributionTarget(Oid targetRelationId); static List * RedistributedInsertSelectTaskList(Query *insertSelectQuery, DistTableCacheEntry *targetRelation, List **redistributedResults, bool useBinaryFormat); static int PartitionColumnIndex(List *insertTargetList, Var *partitionColumn); -static bool IsRedistributablePlan(Plan *selectPlan, bool hasReturning, bool - hasOnConflict); +static bool IsRedistributablePlan(Plan *selectPlan, bool hasReturning); +static Expr * CastExpr(Expr *expr, Oid sourceType, Oid targetType, Oid targetCollation, + int targetTypeMod); /* @@ -128,7 +130,6 @@ CoordinatorInsertSelectExecScanInternal(CustomScanState *node) Oid targetRelationId = insertRte->relid; char *intermediateResultIdPrefix = distributedPlan->intermediateResultIdPrefix; bool hasReturning = distributedPlan->hasReturning; - bool hasOnConflict = insertSelectQuery->onConflict != NULL; HTAB *shardStateHash = NULL; /* @@ -146,6 +147,16 @@ CoordinatorInsertSelectExecScanInternal(CustomScanState *node) selectRte->subquery = selectQuery; ReorderInsertSelectTargetLists(insertSelectQuery, insertRte, selectRte); + /* + * If the type of insert column and target table's column type is + * different from each other. Cast insert column't type to target + * table's column + */ + selectQuery->targetList = + AddInsertSelectCasts(insertSelectQuery->targetList, + selectQuery->targetList, + targetRelationId); + /* * Make a copy of the query, since pg_plan_query may scribble on it and we * want it to be replanned every time if it is stored in a prepared @@ -168,7 +179,7 @@ CoordinatorInsertSelectExecScanInternal(CustomScanState *node) LockPartitionRelations(targetRelationId, RowExclusiveLock); } - if (IsRedistributablePlan(selectPlan->planTree, hasReturning, hasOnConflict) && + if (IsRedistributablePlan(selectPlan->planTree, hasReturning) && IsSupportedRedistributionTarget(targetRelationId)) { ereport(DEBUG1, (errmsg("performing repartitioned INSERT ... SELECT"))); @@ -460,16 +471,6 @@ TwoPhaseInsertSelectTaskList(Oid targetRelationId, Query *insertSelectQuery, uint32 taskIdIndex = 1; uint64 jobId = INVALID_JOB_ID; - Relation distributedRelation = heap_open(targetRelationId, RowExclusiveLock); - TupleDesc destTupleDescriptor = RelationGetDescr(distributedRelation); - - /* - * If the type of insert column and target table's column type is - * different from each other. Cast insert column't type to target - * table's column - */ - AddInsertSelectCasts(insertSelectQuery->targetList, destTupleDescriptor); - for (int shardOffset = 0; shardOffset < shardCount; shardOffset++) { ShardInterval *targetShardInterval = @@ -529,8 +530,6 @@ TwoPhaseInsertSelectTaskList(Oid targetRelationId, Query *insertSelectQuery, taskIdIndex++; } - heap_close(distributedRelation, NoLock); - return taskList; } @@ -683,33 +682,182 @@ ExecutingInsertSelect(void) /* - * AddInsertSelectCasts makes sure that the types in columns in targetList - * have the same type as given tuple descriptor by adding necessary type - * casts. + * AddInsertSelectCasts makes sure that the types in columns in the given + * target lists have the same type as the columns of the given relation. + * It might add casts to ensure that. + * + * It returns the updated selectTargetList. */ -static void -AddInsertSelectCasts(List *targetList, TupleDesc destTupleDescriptor) +static List * +AddInsertSelectCasts(List *insertTargetList, List *selectTargetList, + Oid targetRelationId) { - ListCell *targetEntryCell = NULL; + ListCell *insertEntryCell = NULL; + ListCell *selectEntryCell = NULL; + List *projectedEntries = NIL; + List *nonProjectedEntries = NIL; - foreach(targetEntryCell, targetList) + /* + * ReorderInsertSelectTargetLists() makes sure that first few columns of + * the SELECT query match the insert targets. It might contain additional + * items for GROUP BY, etc. + */ + Assert(list_length(insertTargetList) <= list_length(selectTargetList)); + + Relation distributedRelation = heap_open(targetRelationId, RowExclusiveLock); + TupleDesc destTupleDescriptor = RelationGetDescr(distributedRelation); + + int targetEntryIndex = 0; + forboth(insertEntryCell, insertTargetList, selectEntryCell, selectTargetList) { - TargetEntry *targetEntry = (TargetEntry *) lfirst(targetEntryCell); - Var *insertColumn = (Var *) targetEntry->expr; - Form_pg_attribute attr = TupleDescAttr(destTupleDescriptor, targetEntry->resno - - 1); + TargetEntry *insertEntry = (TargetEntry *) lfirst(insertEntryCell); + TargetEntry *selectEntry = (TargetEntry *) lfirst(selectEntryCell); + Var *insertColumn = (Var *) insertEntry->expr; + Form_pg_attribute attr = TupleDescAttr(destTupleDescriptor, + insertEntry->resno - 1); - if (insertColumn->vartype != attr->atttypid) + Oid sourceType = insertColumn->vartype; + Oid targetType = attr->atttypid; + if (sourceType != targetType) { - CoerceViaIO *coerceExpr = makeNode(CoerceViaIO); - coerceExpr->arg = (Expr *) copyObject(insertColumn); - coerceExpr->resulttype = attr->atttypid; - coerceExpr->resultcollid = attr->attcollation; - coerceExpr->coerceformat = COERCE_IMPLICIT_CAST; - coerceExpr->location = -1; + insertEntry->expr = CastExpr((Expr *) insertColumn, sourceType, targetType, + attr->attcollation, attr->atttypmod); - targetEntry->expr = (Expr *) coerceExpr; + /* + * We cannot modify the selectEntry in-place, because ORDER BY or + * GROUP BY clauses might be pointing to it with comparison types + * of the source type. So instead we keep the original one as a + * non-projected entry, so GROUP BY and ORDER BY are happy, and + * create a duplicated projected entry with the coerced expression. + */ + TargetEntry *coercedEntry = copyObject(selectEntry); + coercedEntry->expr = CastExpr((Expr *) selectEntry->expr, sourceType, + targetType, attr->attcollation, + attr->atttypmod); + coercedEntry->ressortgroupref = 0; + + /* + * The only requirement is that users don't use this name in ORDER BY + * or GROUP BY, and it should be unique across the same query. + */ + StringInfo resnameString = makeStringInfo(); + appendStringInfo(resnameString, "auto_coerced_by_citus_%d", targetEntryIndex); + coercedEntry->resname = resnameString->data; + + projectedEntries = lappend(projectedEntries, coercedEntry); + + if (selectEntry->ressortgroupref != 0) + { + selectEntry->resjunk = true; + nonProjectedEntries = lappend(nonProjectedEntries, selectEntry); + } } + else + { + projectedEntries = lappend(projectedEntries, selectEntry); + } + + targetEntryIndex++; + } + + for (int entryIndex = list_length(insertTargetList); + entryIndex < list_length(selectTargetList); + entryIndex++) + { + nonProjectedEntries = lappend(nonProjectedEntries, list_nth(selectTargetList, + entryIndex)); + } + + /* selectEntry->resno must be the ordinal number of the entry */ + selectTargetList = list_concat(projectedEntries, nonProjectedEntries); + int entryResNo = 1; + foreach(selectEntryCell, selectTargetList) + { + TargetEntry *selectEntry = lfirst(selectEntryCell); + selectEntry->resno = entryResNo++; + } + + heap_close(distributedRelation, NoLock); + + return selectTargetList; +} + + +/* + * CastExpr returns an expression which casts the given expr from sourceType to + * the given targetType. + */ +static Expr * +CastExpr(Expr *expr, Oid sourceType, Oid targetType, Oid targetCollation, + int targetTypeMod) +{ + Oid coercionFuncId = InvalidOid; + CoercionPathType coercionType = find_coercion_pathway(targetType, sourceType, + COERCION_EXPLICIT, + &coercionFuncId); + + if (coercionType == COERCION_PATH_FUNC) + { + FuncExpr *coerceExpr = makeNode(FuncExpr); + coerceExpr->funcid = coercionFuncId; + coerceExpr->args = list_make1(copyObject(expr)); + coerceExpr->funccollid = targetCollation; + coerceExpr->funcresulttype = targetType; + + return (Expr *) coerceExpr; + } + else if (coercionType == COERCION_PATH_RELABELTYPE) + { + RelabelType *coerceExpr = makeNode(RelabelType); + coerceExpr->arg = copyObject(expr); + coerceExpr->resulttype = targetType; + coerceExpr->resulttypmod = targetTypeMod; + coerceExpr->resultcollid = targetCollation; + coerceExpr->relabelformat = COERCE_IMPLICIT_CAST; + coerceExpr->location = -1; + + return (Expr *) coerceExpr; + } + else if (coercionType == COERCION_PATH_ARRAYCOERCE) + { + Oid sourceBaseType = get_base_element_type(sourceType); + Oid targetBaseType = get_base_element_type(targetType); + + CaseTestExpr *elemExpr = makeNode(CaseTestExpr); + elemExpr->collation = targetCollation; + elemExpr->typeId = sourceBaseType; + elemExpr->typeMod = -1; + + Expr *elemCastExpr = CastExpr((Expr *) elemExpr, sourceBaseType, + targetBaseType, targetCollation, + targetTypeMod); + + ArrayCoerceExpr *coerceExpr = makeNode(ArrayCoerceExpr); + coerceExpr->arg = copyObject(expr); + coerceExpr->elemexpr = elemCastExpr; + coerceExpr->resultcollid = targetCollation; + coerceExpr->resulttype = targetType; + coerceExpr->resulttypmod = targetTypeMod; + coerceExpr->location = -1; + coerceExpr->coerceformat = COERCE_IMPLICIT_CAST; + + return (Expr *) coerceExpr; + } + else if (coercionType == COERCION_PATH_COERCEVIAIO) + { + CoerceViaIO *coerceExpr = makeNode(CoerceViaIO); + coerceExpr->arg = (Expr *) copyObject(expr); + coerceExpr->resulttype = targetType; + coerceExpr->resultcollid = targetCollation; + coerceExpr->coerceformat = COERCE_IMPLICIT_CAST; + coerceExpr->location = -1; + + return (Expr *) coerceExpr; + } + else + { + ereport(ERROR, (errmsg("could not find a conversion path from type %d to %d", + sourceType, targetType))); } } @@ -765,16 +913,6 @@ RedistributedInsertSelectTaskList(Query *insertSelectQuery, uint32 taskIdIndex = 1; uint64 jobId = INVALID_JOB_ID; - Relation distributedRelation = heap_open(targetRelationId, RowExclusiveLock); - TupleDesc destTupleDescriptor = RelationGetDescr(distributedRelation); - - /* - * If the type of insert column and target table's column type is - * different from each other. Cast insert column't type to target - * table's column - */ - AddInsertSelectCasts(insertSelectQuery->targetList, destTupleDescriptor); - for (shardOffset = 0; shardOffset < shardCount; shardOffset++) { ShardInterval *targetShardInterval = @@ -839,8 +977,6 @@ RedistributedInsertSelectTaskList(Query *insertSelectQuery, taskIdIndex++; } - heap_close(distributedRelation, NoLock); - return taskList; } @@ -872,7 +1008,7 @@ PartitionColumnIndex(List *insertTargetList, Var *partitionColumn) * IsRedistributablePlan returns true if the given plan is a redistrituable plan. */ static bool -IsRedistributablePlan(Plan *selectPlan, bool hasReturning, bool hasOnConflict) +IsRedistributablePlan(Plan *selectPlan, bool hasReturning) { if (hasReturning) { diff --git a/src/test/regress/expected/insert_select_repartition.out b/src/test/regress/expected/insert_select_repartition.out index f135429b6..e33e7a527 100644 --- a/src/test/regress/expected/insert_select_repartition.out +++ b/src/test/regress/expected/insert_select_repartition.out @@ -4,7 +4,6 @@ SET search_path TO 'insert_select_repartition'; SET citus.next_shard_id TO 4213581; SET citus.shard_replication_factor TO 1; SET citus.replication_model TO 'streaming'; --- Test 1 -- 4 shards, hash distributed. -- Negate distribution column value. SET citus.shard_count TO 4; @@ -46,8 +45,8 @@ SELECT * FROM target_table WHERE a=-1 OR a=-3 OR a=-7 ORDER BY a; DROP TABLE source_table, target_table; -- --- Test 2. -- range partitioning, composite distribution column +-- CREATE TYPE composite_key_type AS (f1 int, f2 text); -- source CREATE TABLE source_table(f1 int, key composite_key_type, value int, mapped_key composite_key_type); @@ -183,6 +182,92 @@ SELECT * FROM target_table ORDER BY key; -- missing value for distribution column INSERT INTO target_table(value) SELECT value FROM source_table; ERROR: the partition column of table insert_select_repartition.target_table should have a value +DROP TABLE source_table, target_table; +-- different column types +-- verifies that we add necessary casts, otherwise even shard routing won't +-- work correctly and we will see 2 values for the same primary key. +CREATE TABLE target_table(col_1 int primary key, col_2 int); +SELECT create_distributed_table('target_table','col_1'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +INSERT INTO target_table VALUES (1,2), (2,3), (3,4), (4,5), (5,6); +CREATE TABLE source_table(col_1 numeric, col_2 numeric, col_3 numeric); +SELECT create_distributed_table('source_table','col_1'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +INSERT INTO source_table VALUES (1,1,1), (3,3,3), (5,5,5); +SET client_min_messages TO DEBUG2; +INSERT INTO target_table +SELECT + col_1, col_2 +FROM + source_table +ON CONFLICT(col_1) DO UPDATE SET col_2 = EXCLUDED.col_2; +DEBUG: cannot perform distributed INSERT INTO ... SELECT because the partition columns in the source table and subquery do not match +DETAIL: The data type of the target table's partition column should exactly match the data type of the corresponding simple column reference in the subquery. +DEBUG: Router planner cannot handle multi-shard select queries +DEBUG: performing repartitioned INSERT ... SELECT +DEBUG: partitioning SELECT query by column index 0 with name 'auto_coerced_by_citus_0' +DEBUG: distributed statement: INSERT INTO insert_select_repartition.target_table_4213593 AS citus_table_alias (col_1, col_2) SELECT auto_coerced_by_citus_0, auto_coerced_by_citus_1 FROM read_intermediate_results('{repartitioned_results_from_4213597_to_0,repartitioned_results_from_4213600_to_0}'::text[], 'binary'::citus_copy_format) intermediate_result(auto_coerced_by_citus_0 integer, auto_coerced_by_citus_1 integer) ON CONFLICT(col_1) DO UPDATE SET col_2 = excluded.col_2 +DEBUG: distributed statement: INSERT INTO insert_select_repartition.target_table_4213594 AS citus_table_alias (col_1, col_2) SELECT auto_coerced_by_citus_0, auto_coerced_by_citus_1 FROM read_intermediate_results('{repartitioned_results_from_4213599_to_1}'::text[], 'binary'::citus_copy_format) intermediate_result(auto_coerced_by_citus_0 integer, auto_coerced_by_citus_1 integer) ON CONFLICT(col_1) DO UPDATE SET col_2 = excluded.col_2 +RESET client_min_messages; +SELECT * FROM target_table ORDER BY 1; + col_1 | col_2 +--------------------------------------------------------------------- + 1 | 1 + 2 | 3 + 3 | 3 + 4 | 5 + 5 | 5 +(5 rows) + +DROP TABLE source_table, target_table; +-- +-- array coercion +-- +SET citus.shard_count TO 3; +CREATE TABLE source_table(a int, mapped_key int, c float[]); +SELECT create_distributed_table('source_table', 'a'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +INSERT INTO source_table VALUES (1, -1, ARRAY[1.1, 2.2, 3.3]), (2, -2, ARRAY[4.5, 5.8]), + (3, -3, ARRAY[]::float[]), (4, -4, ARRAY[3.3]); +SET citus.shard_count TO 2; +CREATE TABLE target_table(a int, b int[]); +SELECT create_distributed_table('target_table', 'a'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +SET client_min_messages TO DEBUG2; +INSERT INTO target_table SELECT mapped_key, c FROM source_table; +DEBUG: cannot perform distributed INSERT INTO ... SELECT because the partition columns in the source table and subquery do not match +DETAIL: The target table's partition column should correspond to a partition column in the subquery. +DEBUG: Router planner cannot handle multi-shard select queries +DEBUG: performing repartitioned INSERT ... SELECT +DEBUG: partitioning SELECT query by column index 0 with name 'mapped_key' +DEBUG: distributed statement: INSERT INTO insert_select_repartition.target_table_4213604 AS citus_table_alias (a, b) SELECT mapped_key, auto_coerced_by_citus_1 FROM read_intermediate_results('{repartitioned_results_from_4213602_to_0,repartitioned_results_from_4213603_to_0}'::text[], 'binary'::citus_copy_format) intermediate_result(mapped_key integer, auto_coerced_by_citus_1 integer[]) +DEBUG: distributed statement: INSERT INTO insert_select_repartition.target_table_4213605 AS citus_table_alias (a, b) SELECT mapped_key, auto_coerced_by_citus_1 FROM read_intermediate_results('{repartitioned_results_from_4213601_to_1}'::text[], 'binary'::citus_copy_format) intermediate_result(mapped_key integer, auto_coerced_by_citus_1 integer[]) +RESET client_min_messages; +SELECT * FROM target_table ORDER BY a; + a | b +--------------------------------------------------------------------- + -4 | {3} + -3 | {} + -2 | {4,6} + -1 | {1,2,3} +(4 rows) + DROP TABLE source_table, target_table; SET client_min_messages TO WARNING; DROP SCHEMA insert_select_repartition CASCADE; diff --git a/src/test/regress/expected/multi_insert_select.out b/src/test/regress/expected/multi_insert_select.out index 6b5c6b2aa..b50514ba9 100644 --- a/src/test/regress/expected/multi_insert_select.out +++ b/src/test/regress/expected/multi_insert_select.out @@ -1083,7 +1083,7 @@ DEBUG: join prunable for intervals [1073741824,2147483647] and [-2147483648,-10 DEBUG: join prunable for intervals [1073741824,2147483647] and [-1073741824,-1] DEBUG: join prunable for intervals [1073741824,2147483647] and [0,1073741823] DEBUG: generating subplan XXX_1 for subquery SELECT sum(raw_events_second.value_4) AS v4, sum(raw_events_first.value_1) AS v1, raw_events_second.value_3 AS id FROM public.raw_events_first, public.raw_events_second WHERE (raw_events_first.user_id OPERATOR(pg_catalog.=) raw_events_second.user_id) GROUP BY raw_events_second.value_3 -DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT id, v1, v4 FROM (SELECT intermediate_result.v4, intermediate_result.v1, intermediate_result.id FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(v4 numeric, v1 bigint, id double precision)) foo +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT int4(id) AS auto_coerced_by_citus_0, int4(v1) AS auto_coerced_by_citus_1, int8(v4) AS auto_coerced_by_citus_2 FROM (SELECT intermediate_result.v4, intermediate_result.v1, intermediate_result.id FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(v4 numeric, v1 bigint, id double precision)) foo DEBUG: Creating router plan DEBUG: Plan is router executable DEBUG: Collecting INSERT ... SELECT results on coordinator @@ -1142,11 +1142,11 @@ DETAIL: Subquery contains an explicit cast in the same position as the target t HINT: Ensure the target table's partition column has a corresponding simple column reference to a distributed table's partition column in the subquery. DEBUG: Router planner cannot handle multi-shard select queries DEBUG: performing repartitioned INSERT ... SELECT -DEBUG: partitioning SELECT query by column index 0 with name 'user_id' -DEBUG: distributed statement: INSERT INTO public.raw_events_second_13300004 AS citus_table_alias (user_id) SELECT user_id FROM read_intermediate_results('{repartitioned_results_from_13300000_to_0}'::text[], 'binary'::citus_copy_format) intermediate_result(user_id bigint) -DEBUG: distributed statement: INSERT INTO public.raw_events_second_13300005 AS citus_table_alias (user_id) SELECT user_id FROM read_intermediate_results('{repartitioned_results_from_13300001_to_1}'::text[], 'binary'::citus_copy_format) intermediate_result(user_id bigint) -DEBUG: distributed statement: INSERT INTO public.raw_events_second_13300006 AS citus_table_alias (user_id) SELECT user_id FROM read_intermediate_results('{repartitioned_results_from_13300002_to_2}'::text[], 'binary'::citus_copy_format) intermediate_result(user_id bigint) -DEBUG: distributed statement: INSERT INTO public.raw_events_second_13300007 AS citus_table_alias (user_id) SELECT user_id FROM read_intermediate_results('{repartitioned_results_from_13300003_to_3}'::text[], 'binary'::citus_copy_format) intermediate_result(user_id bigint) +DEBUG: partitioning SELECT query by column index 0 with name 'auto_coerced_by_citus_0' +DEBUG: distributed statement: INSERT INTO public.raw_events_second_13300004 AS citus_table_alias (user_id) SELECT auto_coerced_by_citus_0 FROM read_intermediate_results('{repartitioned_results_from_13300000_to_0}'::text[], 'binary'::citus_copy_format) intermediate_result(auto_coerced_by_citus_0 integer) +DEBUG: distributed statement: INSERT INTO public.raw_events_second_13300005 AS citus_table_alias (user_id) SELECT auto_coerced_by_citus_0 FROM read_intermediate_results('{repartitioned_results_from_13300001_to_1}'::text[], 'binary'::citus_copy_format) intermediate_result(auto_coerced_by_citus_0 integer) +DEBUG: distributed statement: INSERT INTO public.raw_events_second_13300006 AS citus_table_alias (user_id) SELECT auto_coerced_by_citus_0 FROM read_intermediate_results('{repartitioned_results_from_13300002_to_2}'::text[], 'binary'::citus_copy_format) intermediate_result(auto_coerced_by_citus_0 integer) +DEBUG: distributed statement: INSERT INTO public.raw_events_second_13300007 AS citus_table_alias (user_id) SELECT auto_coerced_by_citus_0 FROM read_intermediate_results('{repartitioned_results_from_13300003_to_3}'::text[], 'binary'::citus_copy_format) intermediate_result(auto_coerced_by_citus_0 integer) INSERT INTO agg_events (value_3_agg, value_4_agg, @@ -1165,7 +1165,7 @@ DETAIL: Subquery contains an aggregation in the same position as the target tab HINT: Ensure the target table's partition column has a corresponding simple column reference to a distributed table's partition column in the subquery. DEBUG: Router planner cannot handle multi-shard select queries DEBUG: performing repartitioned INSERT ... SELECT -DEBUG: partitioning SELECT query by column index 0 with name 'avg' +DEBUG: partitioning SELECT query by column index 0 with name 'auto_coerced_by_citus_0' ERROR: the partition column value cannot be NULL CONTEXT: while executing command on localhost:xxxxx INSERT INTO agg_events @@ -1240,10 +1240,10 @@ DEBUG: join prunable for intervals [1073741824,2147483647] and [-2147483648,-10 DEBUG: join prunable for intervals [1073741824,2147483647] and [-1073741824,-1] DEBUG: join prunable for intervals [1073741824,2147483647] and [0,1073741823] DEBUG: generating subplan XXX_1 for subquery SELECT sum(raw_events_second.value_4) AS v4, raw_events_second.value_1 AS v1, sum(raw_events_second.user_id) AS id FROM public.raw_events_first, public.raw_events_second WHERE (raw_events_first.user_id OPERATOR(pg_catalog.=) raw_events_second.user_id) GROUP BY raw_events_second.value_1 HAVING (sum(raw_events_second.value_4) OPERATOR(pg_catalog.>) (10)::numeric) -DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT f2.id FROM ((SELECT foo.id FROM (SELECT reference_table.user_id AS id FROM public.raw_events_first, public.reference_table WHERE (raw_events_first.user_id OPERATOR(pg_catalog.=) reference_table.user_id)) foo) f JOIN (SELECT foo2.v4, foo2.v1, foo2.id FROM (SELECT intermediate_result.v4, intermediate_result.v1, intermediate_result.id FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(v4 numeric, v1 integer, id bigint)) foo2) f2 ON ((f.id OPERATOR(pg_catalog.=) f2.id))) +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT int4(f2.id) AS auto_coerced_by_citus_0 FROM ((SELECT foo.id FROM (SELECT reference_table.user_id AS id FROM public.raw_events_first, public.reference_table WHERE (raw_events_first.user_id OPERATOR(pg_catalog.=) reference_table.user_id)) foo) f JOIN (SELECT foo2.v4, foo2.v1, foo2.id FROM (SELECT intermediate_result.v4, intermediate_result.v1, intermediate_result.id FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(v4 numeric, v1 integer, id bigint)) foo2) f2 ON ((f.id OPERATOR(pg_catalog.=) f2.id))) DEBUG: Router planner cannot handle multi-shard select queries DEBUG: performing repartitioned INSERT ... SELECT -DEBUG: partitioning SELECT query by column index 0 with name 'id' +DEBUG: partitioning SELECT query by column index 0 with name 'auto_coerced_by_citus_0' -- the second part of the query is not routable since -- GROUP BY not on the partition column (i.e., value_1) and thus join -- on f.id = f2.id is not on the partition key (instead on the sum of partition key) @@ -2149,7 +2149,7 @@ DETAIL: Subquery contains an expression that is not a simple column reference i HINT: Ensure the target table's partition column has a corresponding simple column reference to a distributed table's partition column in the subquery. DEBUG: Router planner cannot handle multi-shard select queries DEBUG: performing repartitioned INSERT ... SELECT -DEBUG: partitioning SELECT query by column index 0 with name '?column?' +DEBUG: partitioning SELECT query by column index 0 with name 'auto_coerced_by_citus_0' INSERT INTO text_table (part_col) SELECT part_col::text from char_table; DEBUG: cannot perform distributed INSERT INTO ... SELECT because the partition columns in the source table and subquery do not match DETAIL: Subquery contains an explicit coercion in the same position as the target table's partition column. @@ -2163,13 +2163,13 @@ DETAIL: Subquery contains an expression that is not a simple column reference i HINT: Ensure the target table's partition column has a corresponding simple column reference to a distributed table's partition column in the subquery. DEBUG: Router planner cannot handle multi-shard select queries DEBUG: performing repartitioned INSERT ... SELECT -DEBUG: partitioning SELECT query by column index 0 with name '?column?' +DEBUG: partitioning SELECT query by column index 0 with name 'auto_coerced_by_citus_0' INSERT INTO text_table (part_col) SELECT val FROM text_table; DEBUG: cannot perform distributed INSERT INTO ... SELECT because the partition columns in the source table and subquery do not match DETAIL: The data type of the target table's partition column should exactly match the data type of the corresponding simple column reference in the subquery. DEBUG: Router planner cannot handle multi-shard select queries DEBUG: performing repartitioned INSERT ... SELECT -DEBUG: partitioning SELECT query by column index 0 with name 'val' +DEBUG: partitioning SELECT query by column index 0 with name 'auto_coerced_by_citus_0' INSERT INTO text_table (part_col) SELECT val::text FROM text_table; DEBUG: cannot perform distributed INSERT INTO ... SELECT because the partition columns in the source table and subquery do not match DETAIL: Subquery contains an explicit coercion in the same position as the target table's partition column. diff --git a/src/test/regress/expected/multi_insert_select_conflict.out b/src/test/regress/expected/multi_insert_select_conflict.out index b1d1803dd..4cbed96ce 100644 --- a/src/test/regress/expected/multi_insert_select_conflict.out +++ b/src/test/regress/expected/multi_insert_select_conflict.out @@ -54,7 +54,7 @@ FROM ON CONFLICT DO NOTHING; DEBUG: cannot perform distributed INSERT INTO ... SELECT because the partition columns in the source table and subquery do not match DETAIL: The target table's partition column should correspond to a partition column in the subquery. -DEBUG: Collecting INSERT ... SELECT results on coordinator +DEBUG: performing repartitioned INSERT ... SELECT -- Since partition columns do not match, pull the data to the coordinator -- and update the non-partition column. Query is wrapped by CTE to return -- ordered result. @@ -216,9 +216,9 @@ WITH cte AS( ) INSERT INTO target_table ((SELECT * FROM cte) UNION (SELECT * FROM cte_2)) ON CONFLICT(col_1) DO UPDATE SET col_2 = EXCLUDED.col_2 + 1; DEBUG: distributed INSERT ... SELECT can only select from distributed tables -DEBUG: Collecting INSERT ... SELECT results on coordinator DEBUG: CTE cte is going to be inlined via distributed planning DEBUG: CTE cte_2 is going to be inlined via distributed planning +DEBUG: performing repartitioned INSERT ... SELECT SELECT * FROM target_table ORDER BY 1; col_1 | col_2 --------------------------------------------------------------------- @@ -395,7 +395,7 @@ FROM ON CONFLICT(col_1) DO UPDATE SET col_2 = EXCLUDED.col_2; DEBUG: cannot perform distributed INSERT INTO ... SELECT because the partition columns in the source table and subquery do not match DETAIL: The data type of the target table's partition column should exactly match the data type of the corresponding simple column reference in the subquery. -DEBUG: Collecting INSERT ... SELECT results on coordinator +DEBUG: performing repartitioned INSERT ... SELECT SELECT * FROM target_table ORDER BY 1; col_1 | col_2 --------------------------------------------------------------------- diff --git a/src/test/regress/expected/multi_router_planner_fast_path.out b/src/test/regress/expected/multi_router_planner_fast_path.out index 4a9ad5e73..e89122ec7 100644 --- a/src/test/regress/expected/multi_router_planner_fast_path.out +++ b/src/test/regress/expected/multi_router_planner_fast_path.out @@ -1926,40 +1926,40 @@ INSERT INTO articles_hash SELECT * FROM articles_hash WHERE author_id = $2 AND word_count = $1 OFFSET 0; EXECUTE insert_sel(1,1); DEBUG: OFFSET clauses are not allowed in distributed INSERT ... SELECT queries -DEBUG: Collecting INSERT ... SELECT results on coordinator DEBUG: Deferred pruning for a fast-path router query DEBUG: Creating router plan DEBUG: Plan is router executable +DEBUG: Collecting INSERT ... SELECT results on coordinator EXECUTE insert_sel(1,1); DEBUG: OFFSET clauses are not allowed in distributed INSERT ... SELECT queries -DEBUG: Collecting INSERT ... SELECT results on coordinator DEBUG: Deferred pruning for a fast-path router query DEBUG: Creating router plan DEBUG: Plan is router executable +DEBUG: Collecting INSERT ... SELECT results on coordinator EXECUTE insert_sel(1,1); DEBUG: OFFSET clauses are not allowed in distributed INSERT ... SELECT queries -DEBUG: Collecting INSERT ... SELECT results on coordinator DEBUG: Deferred pruning for a fast-path router query DEBUG: Creating router plan DEBUG: Plan is router executable +DEBUG: Collecting INSERT ... SELECT results on coordinator EXECUTE insert_sel(1,1); DEBUG: OFFSET clauses are not allowed in distributed INSERT ... SELECT queries -DEBUG: Collecting INSERT ... SELECT results on coordinator DEBUG: Deferred pruning for a fast-path router query DEBUG: Creating router plan DEBUG: Plan is router executable +DEBUG: Collecting INSERT ... SELECT results on coordinator EXECUTE insert_sel(1,1); DEBUG: OFFSET clauses are not allowed in distributed INSERT ... SELECT queries -DEBUG: Collecting INSERT ... SELECT results on coordinator DEBUG: Deferred pruning for a fast-path router query DEBUG: Creating router plan DEBUG: Plan is router executable +DEBUG: Collecting INSERT ... SELECT results on coordinator EXECUTE insert_sel(1,1); DEBUG: OFFSET clauses are not allowed in distributed INSERT ... SELECT queries -DEBUG: Collecting INSERT ... SELECT results on coordinator DEBUG: Deferred pruning for a fast-path router query DEBUG: Creating router plan DEBUG: Plan is router executable +DEBUG: Collecting INSERT ... SELECT results on coordinator -- one final interesting preperad statement -- where one of the filters is on the target list PREPARE fast_path_agg_filter(int, int) AS diff --git a/src/test/regress/expected/with_dml.out b/src/test/regress/expected/with_dml.out index 2367e602a..766d4cece 100644 --- a/src/test/regress/expected/with_dml.out +++ b/src/test/regress/expected/with_dml.out @@ -96,7 +96,6 @@ INSERT INTO distributed_table DEBUG: cannot perform distributed INSERT INTO ... SELECT because the partition columns in the source table and subquery do not match DETAIL: Subquery contains an expression that is not a simple column reference in the same position as the target table's partition column. HINT: Ensure the target table's partition column has a corresponding simple column reference to a distributed table's partition column in the subquery. -DEBUG: Collecting INSERT ... SELECT results on coordinator DEBUG: CTE ids_to_insert is going to be inlined via distributed planning DEBUG: generating subplan XXX_1 for subquery SELECT (((tenant_id)::integer OPERATOR(pg_catalog.*) 100))::text AS tenant_id FROM with_dml.distributed_table WHERE (dept OPERATOR(pg_catalog.>) 7) DEBUG: generating subplan XXX_2 for subquery SELECT DISTINCT ids_to_insert.tenant_id FROM (SELECT intermediate_result.tenant_id FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(tenant_id text)) ids_to_insert, with_dml.distributed_table WHERE (distributed_table.tenant_id OPERATOR(pg_catalog.<) ids_to_insert.tenant_id) diff --git a/src/test/regress/sql/insert_select_repartition.sql b/src/test/regress/sql/insert_select_repartition.sql index da07796f5..72fc4d956 100644 --- a/src/test/regress/sql/insert_select_repartition.sql +++ b/src/test/regress/sql/insert_select_repartition.sql @@ -6,7 +6,6 @@ SET citus.next_shard_id TO 4213581; SET citus.shard_replication_factor TO 1; SET citus.replication_model TO 'streaming'; --- Test 1 -- 4 shards, hash distributed. -- Negate distribution column value. SET citus.shard_count TO 4; @@ -25,8 +24,8 @@ SELECT * FROM target_table WHERE a=-1 OR a=-3 OR a=-7 ORDER BY a; DROP TABLE source_table, target_table; -- --- Test 2. -- range partitioning, composite distribution column +-- CREATE TYPE composite_key_type AS (f1 int, f2 text); -- source @@ -82,13 +81,15 @@ INSERT INTO target_table(value) SELECT value FROM source_table; DROP TABLE source_table, target_table; -- different column types +-- verifies that we add necessary casts, otherwise even shard routing won't +-- work correctly and we will see 2 values for the same primary key. CREATE TABLE target_table(col_1 int primary key, col_2 int); SELECT create_distributed_table('target_table','col_1'); -INSERT INTO target_table VALUES(1,2),(2,3),(3,4),(4,5),(5,6); +INSERT INTO target_table VALUES (1,2), (2,3), (3,4), (4,5), (5,6); CREATE TABLE source_table(col_1 numeric, col_2 numeric, col_3 numeric); SELECT create_distributed_table('source_table','col_1'); -INSERT INTO source_table VALUES(1,1,1),(2,2,2),(3,3,3),(4,4,4),(5,5,5); +INSERT INTO source_table VALUES (1,1,1), (3,3,3), (5,5,5); SET client_min_messages TO DEBUG2; INSERT INTO target_table @@ -103,5 +104,26 @@ SELECT * FROM target_table ORDER BY 1; DROP TABLE source_table, target_table; +-- +-- array coercion +-- +SET citus.shard_count TO 3; +CREATE TABLE source_table(a int, mapped_key int, c float[]); +SELECT create_distributed_table('source_table', 'a'); +INSERT INTO source_table VALUES (1, -1, ARRAY[1.1, 2.2, 3.3]), (2, -2, ARRAY[4.5, 5.8]), + (3, -3, ARRAY[]::float[]), (4, -4, ARRAY[3.3]); + +SET citus.shard_count TO 2; +CREATE TABLE target_table(a int, b int[]); +SELECT create_distributed_table('target_table', 'a'); + +SET client_min_messages TO DEBUG2; +INSERT INTO target_table SELECT mapped_key, c FROM source_table; +RESET client_min_messages; + +SELECT * FROM target_table ORDER BY a; + +DROP TABLE source_table, target_table; + SET client_min_messages TO WARNING; DROP SCHEMA insert_select_repartition CASCADE;