diff --git a/src/backend/distributed/executor/insert_select_executor.c b/src/backend/distributed/executor/insert_select_executor.c index 49c0db137..84ddca0d0 100644 --- a/src/backend/distributed/executor/insert_select_executor.c +++ b/src/backend/distributed/executor/insert_select_executor.c @@ -78,6 +78,7 @@ static int PartitionColumnIndex(List *insertTargetList, Var *partitionColumn); static bool IsRedistributablePlan(Plan *selectPlan, bool hasReturning); static Expr * CastExpr(Expr *expr, Oid sourceType, Oid targetType, Oid targetCollation, int targetTypeMod); +static void WrapTaskListForProjection(List *taskList, List *projectedTargetEntries); /* @@ -233,6 +234,19 @@ CoordinatorInsertSelectExecScanInternal(CustomScanState *node) partitionColumnIndex, quote_literal_cstr( partitionColumnName)))); + /* + * ExpandWorkerTargetEntry() can add additional columns to the worker + * query. Modify the task queries to only select columns we need. + */ + int requiredColumnCount = list_length(insertTargetList); + List *jobTargetList = distSelectJob->jobQuery->targetList; + if (list_length(jobTargetList) > requiredColumnCount) + { + List *projectedTargetEntries = ListTake(jobTargetList, + requiredColumnCount); + WrapTaskListForProjection(distSelectTaskList, projectedTargetEntries); + } + List **redistributedResults = RedistributeTaskListResults(distResultPrefix, distSelectTaskList, partitionColumnIndex, @@ -1038,3 +1052,41 @@ IsRedistributablePlan(Plan *selectPlan, bool hasReturning) return true; } + + +/* + * WrapForProjection wraps task->queryString to only select given projected + * columns. It modifies the taskList. + */ +static void +WrapTaskListForProjection(List *taskList, List *projectedTargetEntries) +{ + StringInfo projectedColumnsString = makeStringInfo(); + int entryIndex = 0; + TargetEntry *targetEntry = NULL; + foreach_ptr(targetEntry, projectedTargetEntries) + { + if (entryIndex != 0) + { + appendStringInfoChar(projectedColumnsString, ','); + } + + char *columnName = targetEntry->resname; + Assert(columnName != NULL); + appendStringInfoString(projectedColumnsString, quote_identifier(columnName)); + + entryIndex++; + } + + Task *task = NULL; + foreach_ptr(task, taskList) + { + Assert(task->queryString != NULL); + + StringInfo wrappedQuery = makeStringInfo(); + appendStringInfo(wrappedQuery, "SELECT %s FROM (%s) subquery", + projectedColumnsString->data, + task->queryString); + task->queryString = wrappedQuery->data; + } +} diff --git a/src/backend/distributed/utils/listutils.c b/src/backend/distributed/utils/listutils.c index 6fffe1c4f..a94beab91 100644 --- a/src/backend/distributed/utils/listutils.c +++ b/src/backend/distributed/utils/listutils.c @@ -176,3 +176,29 @@ StringJoin(List *stringList, char delimiter) return joinedString->data; } + + +/* + * ListTake returns the first size elements of given list. If size is greater + * than list's length, it returns all elements of list. This is modeled after + * the "take" function used in some Scheme implementations. + */ +List * +ListTake(List *pointerList, int size) +{ + List *result = NIL; + int listIndex = 0; + ListCell *pointerCell = NULL; + + foreach(pointerCell, pointerList) + { + result = lappend(result, lfirst(pointerCell)); + listIndex++; + if (listIndex >= size) + { + break; + } + } + + return result; +} diff --git a/src/include/distributed/listutils.h b/src/include/distributed/listutils.h index 0190a7bea..891be1fe4 100644 --- a/src/include/distributed/listutils.h +++ b/src/include/distributed/listutils.h @@ -73,5 +73,6 @@ extern ArrayType * DatumArrayToArrayType(Datum *datumArray, int datumCount, Oid datumTypeId); extern HTAB * ListToHashSet(List *pointerList, Size keySize, bool isStringList); extern char * StringJoin(List *stringList, char delimiter); +extern List * ListTake(List *pointerList, int size); #endif /* CITUS_LISTUTILS_H */ diff --git a/src/test/regress/expected/insert_select_repartition.out b/src/test/regress/expected/insert_select_repartition.out index e33e7a527..af45576b5 100644 --- a/src/test/regress/expected/insert_select_repartition.out +++ b/src/test/regress/expected/insert_select_repartition.out @@ -268,6 +268,42 @@ SELECT * FROM target_table ORDER BY a; -1 | {1,2,3} (4 rows) +-- +-- worker queries have more columns than necessary. ExpandWorkerTargetEntry() might +-- add additional columns to the target list. +-- +TRUNCATE target_table; +\set VERBOSITY TERSE +-- first verify that the SELECT query below fetches 3 projected columns from workers +SET citus.log_remote_commands TO true; SET client_min_messages TO DEBUG; + CREATE TABLE results AS SELECT max(-a), array_agg(mapped_key) FROM source_table GROUP BY a; +DEBUG: Router planner cannot handle multi-shard select queries +DEBUG: building index "pg_toast_xxxxx_index" on table "pg_toast_xxxxx" serially +LOG: issuing SELECT max((OPERATOR(pg_catalog.-) a)) AS max, array_agg(mapped_key) AS array_agg, a AS worker_column_3 FROM insert_select_repartition.source_table_4213601 source_table WHERE true GROUP BY a +LOG: issuing SELECT max((OPERATOR(pg_catalog.-) a)) AS max, array_agg(mapped_key) AS array_agg, a AS worker_column_3 FROM insert_select_repartition.source_table_4213602 source_table WHERE true GROUP BY a +LOG: issuing SELECT max((OPERATOR(pg_catalog.-) a)) AS max, array_agg(mapped_key) AS array_agg, a AS worker_column_3 FROM insert_select_repartition.source_table_4213603 source_table WHERE true GROUP BY a +RESET citus.log_remote_commands; RESET client_min_messages; +DROP TABLE results; +-- now verify that we don't write the extra columns to the intermediate result files and +-- insertion to the target works fine. +SET client_min_messages TO DEBUG2; +INSERT INTO target_table SELECT max(-a), array_agg(mapped_key) FROM source_table GROUP BY a; +DEBUG: cannot perform distributed INSERT INTO ... SELECT because the partition columns in the source table and subquery do not match +DEBUG: Router planner cannot handle multi-shard select queries +DEBUG: performing repartitioned INSERT ... SELECT +DEBUG: partitioning SELECT query by column index 0 with name 'max' +DEBUG: distributed statement: INSERT INTO insert_select_repartition.target_table_4213604 AS citus_table_alias (a, b) SELECT max, array_agg FROM read_intermediate_results('{repartitioned_results_from_4213602_to_0,repartitioned_results_from_4213603_to_0}'::text[], 'binary'::citus_copy_format) intermediate_result(max integer, array_agg integer[]) +DEBUG: distributed statement: INSERT INTO insert_select_repartition.target_table_4213605 AS citus_table_alias (a, b) SELECT max, array_agg FROM read_intermediate_results('{repartitioned_results_from_4213601_to_1}'::text[], 'binary'::citus_copy_format) intermediate_result(max integer, array_agg integer[]) +RESET client_min_messages; +SELECT * FROM target_table ORDER BY a; + a | b +--------------------------------------------------------------------- + -4 | {-4} + -3 | {-3} + -2 | {-2} + -1 | {-1} +(4 rows) + DROP TABLE source_table, target_table; SET client_min_messages TO WARNING; DROP SCHEMA insert_select_repartition CASCADE; diff --git a/src/test/regress/sql/insert_select_repartition.sql b/src/test/regress/sql/insert_select_repartition.sql index 72fc4d956..ef60350a9 100644 --- a/src/test/regress/sql/insert_select_repartition.sql +++ b/src/test/regress/sql/insert_select_repartition.sql @@ -123,6 +123,27 @@ RESET client_min_messages; SELECT * FROM target_table ORDER BY a; +-- +-- worker queries have more columns than necessary. ExpandWorkerTargetEntry() might +-- add additional columns to the target list. +-- +TRUNCATE target_table; +\set VERBOSITY TERSE + +-- first verify that the SELECT query below fetches 3 projected columns from workers +SET citus.log_remote_commands TO true; SET client_min_messages TO DEBUG; + CREATE TABLE results AS SELECT max(-a), array_agg(mapped_key) FROM source_table GROUP BY a; +RESET citus.log_remote_commands; RESET client_min_messages; +DROP TABLE results; + +-- now verify that we don't write the extra columns to the intermediate result files and +-- insertion to the target works fine. +SET client_min_messages TO DEBUG2; +INSERT INTO target_table SELECT max(-a), array_agg(mapped_key) FROM source_table GROUP BY a; +RESET client_min_messages; + +SELECT * FROM target_table ORDER BY a; + DROP TABLE source_table, target_table; SET client_min_messages TO WARNING;