mirror of https://github.com/citusdata/citus.git
Handle extra columns added in ExpandWorkerTargetEntry() in repartitioned INSERT/SELECT
parent
89463f9760
commit
42c3c03b85
|
@ -78,6 +78,7 @@ static int PartitionColumnIndex(List *insertTargetList, Var *partitionColumn);
|
||||||
static bool IsRedistributablePlan(Plan *selectPlan, bool hasReturning);
|
static bool IsRedistributablePlan(Plan *selectPlan, bool hasReturning);
|
||||||
static Expr * CastExpr(Expr *expr, Oid sourceType, Oid targetType, Oid targetCollation,
|
static Expr * CastExpr(Expr *expr, Oid sourceType, Oid targetType, Oid targetCollation,
|
||||||
int targetTypeMod);
|
int targetTypeMod);
|
||||||
|
static void WrapTaskListForProjection(List *taskList, List *projectedTargetEntries);
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
@ -233,6 +234,19 @@ CoordinatorInsertSelectExecScanInternal(CustomScanState *node)
|
||||||
partitionColumnIndex, quote_literal_cstr(
|
partitionColumnIndex, quote_literal_cstr(
|
||||||
partitionColumnName))));
|
partitionColumnName))));
|
||||||
|
|
||||||
|
/*
|
||||||
|
* ExpandWorkerTargetEntry() can add additional columns to the worker
|
||||||
|
* query. Modify the task queries to only select columns we need.
|
||||||
|
*/
|
||||||
|
int requiredColumnCount = list_length(insertTargetList);
|
||||||
|
List *jobTargetList = distSelectJob->jobQuery->targetList;
|
||||||
|
if (list_length(jobTargetList) > requiredColumnCount)
|
||||||
|
{
|
||||||
|
List *projectedTargetEntries = ListTake(jobTargetList,
|
||||||
|
requiredColumnCount);
|
||||||
|
WrapTaskListForProjection(distSelectTaskList, projectedTargetEntries);
|
||||||
|
}
|
||||||
|
|
||||||
List **redistributedResults = RedistributeTaskListResults(distResultPrefix,
|
List **redistributedResults = RedistributeTaskListResults(distResultPrefix,
|
||||||
distSelectTaskList,
|
distSelectTaskList,
|
||||||
partitionColumnIndex,
|
partitionColumnIndex,
|
||||||
|
@ -1038,3 +1052,41 @@ IsRedistributablePlan(Plan *selectPlan, bool hasReturning)
|
||||||
|
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* WrapForProjection wraps task->queryString to only select given projected
|
||||||
|
* columns. It modifies the taskList.
|
||||||
|
*/
|
||||||
|
static void
|
||||||
|
WrapTaskListForProjection(List *taskList, List *projectedTargetEntries)
|
||||||
|
{
|
||||||
|
StringInfo projectedColumnsString = makeStringInfo();
|
||||||
|
int entryIndex = 0;
|
||||||
|
TargetEntry *targetEntry = NULL;
|
||||||
|
foreach_ptr(targetEntry, projectedTargetEntries)
|
||||||
|
{
|
||||||
|
if (entryIndex != 0)
|
||||||
|
{
|
||||||
|
appendStringInfoChar(projectedColumnsString, ',');
|
||||||
|
}
|
||||||
|
|
||||||
|
char *columnName = targetEntry->resname;
|
||||||
|
Assert(columnName != NULL);
|
||||||
|
appendStringInfoString(projectedColumnsString, quote_identifier(columnName));
|
||||||
|
|
||||||
|
entryIndex++;
|
||||||
|
}
|
||||||
|
|
||||||
|
Task *task = NULL;
|
||||||
|
foreach_ptr(task, taskList)
|
||||||
|
{
|
||||||
|
Assert(task->queryString != NULL);
|
||||||
|
|
||||||
|
StringInfo wrappedQuery = makeStringInfo();
|
||||||
|
appendStringInfo(wrappedQuery, "SELECT %s FROM (%s) subquery",
|
||||||
|
projectedColumnsString->data,
|
||||||
|
task->queryString);
|
||||||
|
task->queryString = wrappedQuery->data;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -176,3 +176,29 @@ StringJoin(List *stringList, char delimiter)
|
||||||
|
|
||||||
return joinedString->data;
|
return joinedString->data;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* ListTake returns the first size elements of given list. If size is greater
|
||||||
|
* than list's length, it returns all elements of list. This is modeled after
|
||||||
|
* the "take" function used in some Scheme implementations.
|
||||||
|
*/
|
||||||
|
List *
|
||||||
|
ListTake(List *pointerList, int size)
|
||||||
|
{
|
||||||
|
List *result = NIL;
|
||||||
|
int listIndex = 0;
|
||||||
|
ListCell *pointerCell = NULL;
|
||||||
|
|
||||||
|
foreach(pointerCell, pointerList)
|
||||||
|
{
|
||||||
|
result = lappend(result, lfirst(pointerCell));
|
||||||
|
listIndex++;
|
||||||
|
if (listIndex >= size)
|
||||||
|
{
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
|
@ -73,5 +73,6 @@ extern ArrayType * DatumArrayToArrayType(Datum *datumArray, int datumCount,
|
||||||
Oid datumTypeId);
|
Oid datumTypeId);
|
||||||
extern HTAB * ListToHashSet(List *pointerList, Size keySize, bool isStringList);
|
extern HTAB * ListToHashSet(List *pointerList, Size keySize, bool isStringList);
|
||||||
extern char * StringJoin(List *stringList, char delimiter);
|
extern char * StringJoin(List *stringList, char delimiter);
|
||||||
|
extern List * ListTake(List *pointerList, int size);
|
||||||
|
|
||||||
#endif /* CITUS_LISTUTILS_H */
|
#endif /* CITUS_LISTUTILS_H */
|
||||||
|
|
|
@ -268,6 +268,42 @@ SELECT * FROM target_table ORDER BY a;
|
||||||
-1 | {1,2,3}
|
-1 | {1,2,3}
|
||||||
(4 rows)
|
(4 rows)
|
||||||
|
|
||||||
|
--
|
||||||
|
-- worker queries have more columns than necessary. ExpandWorkerTargetEntry() might
|
||||||
|
-- add additional columns to the target list.
|
||||||
|
--
|
||||||
|
TRUNCATE target_table;
|
||||||
|
\set VERBOSITY TERSE
|
||||||
|
-- first verify that the SELECT query below fetches 3 projected columns from workers
|
||||||
|
SET citus.log_remote_commands TO true; SET client_min_messages TO DEBUG;
|
||||||
|
CREATE TABLE results AS SELECT max(-a), array_agg(mapped_key) FROM source_table GROUP BY a;
|
||||||
|
DEBUG: Router planner cannot handle multi-shard select queries
|
||||||
|
DEBUG: building index "pg_toast_xxxxx_index" on table "pg_toast_xxxxx" serially
|
||||||
|
LOG: issuing SELECT max((OPERATOR(pg_catalog.-) a)) AS max, array_agg(mapped_key) AS array_agg, a AS worker_column_3 FROM insert_select_repartition.source_table_4213601 source_table WHERE true GROUP BY a
|
||||||
|
LOG: issuing SELECT max((OPERATOR(pg_catalog.-) a)) AS max, array_agg(mapped_key) AS array_agg, a AS worker_column_3 FROM insert_select_repartition.source_table_4213602 source_table WHERE true GROUP BY a
|
||||||
|
LOG: issuing SELECT max((OPERATOR(pg_catalog.-) a)) AS max, array_agg(mapped_key) AS array_agg, a AS worker_column_3 FROM insert_select_repartition.source_table_4213603 source_table WHERE true GROUP BY a
|
||||||
|
RESET citus.log_remote_commands; RESET client_min_messages;
|
||||||
|
DROP TABLE results;
|
||||||
|
-- now verify that we don't write the extra columns to the intermediate result files and
|
||||||
|
-- insertion to the target works fine.
|
||||||
|
SET client_min_messages TO DEBUG2;
|
||||||
|
INSERT INTO target_table SELECT max(-a), array_agg(mapped_key) FROM source_table GROUP BY a;
|
||||||
|
DEBUG: cannot perform distributed INSERT INTO ... SELECT because the partition columns in the source table and subquery do not match
|
||||||
|
DEBUG: Router planner cannot handle multi-shard select queries
|
||||||
|
DEBUG: performing repartitioned INSERT ... SELECT
|
||||||
|
DEBUG: partitioning SELECT query by column index 0 with name 'max'
|
||||||
|
DEBUG: distributed statement: INSERT INTO insert_select_repartition.target_table_4213604 AS citus_table_alias (a, b) SELECT max, array_agg FROM read_intermediate_results('{repartitioned_results_from_4213602_to_0,repartitioned_results_from_4213603_to_0}'::text[], 'binary'::citus_copy_format) intermediate_result(max integer, array_agg integer[])
|
||||||
|
DEBUG: distributed statement: INSERT INTO insert_select_repartition.target_table_4213605 AS citus_table_alias (a, b) SELECT max, array_agg FROM read_intermediate_results('{repartitioned_results_from_4213601_to_1}'::text[], 'binary'::citus_copy_format) intermediate_result(max integer, array_agg integer[])
|
||||||
|
RESET client_min_messages;
|
||||||
|
SELECT * FROM target_table ORDER BY a;
|
||||||
|
a | b
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
-4 | {-4}
|
||||||
|
-3 | {-3}
|
||||||
|
-2 | {-2}
|
||||||
|
-1 | {-1}
|
||||||
|
(4 rows)
|
||||||
|
|
||||||
DROP TABLE source_table, target_table;
|
DROP TABLE source_table, target_table;
|
||||||
SET client_min_messages TO WARNING;
|
SET client_min_messages TO WARNING;
|
||||||
DROP SCHEMA insert_select_repartition CASCADE;
|
DROP SCHEMA insert_select_repartition CASCADE;
|
||||||
|
|
|
@ -123,6 +123,27 @@ RESET client_min_messages;
|
||||||
|
|
||||||
SELECT * FROM target_table ORDER BY a;
|
SELECT * FROM target_table ORDER BY a;
|
||||||
|
|
||||||
|
--
|
||||||
|
-- worker queries have more columns than necessary. ExpandWorkerTargetEntry() might
|
||||||
|
-- add additional columns to the target list.
|
||||||
|
--
|
||||||
|
TRUNCATE target_table;
|
||||||
|
\set VERBOSITY TERSE
|
||||||
|
|
||||||
|
-- first verify that the SELECT query below fetches 3 projected columns from workers
|
||||||
|
SET citus.log_remote_commands TO true; SET client_min_messages TO DEBUG;
|
||||||
|
CREATE TABLE results AS SELECT max(-a), array_agg(mapped_key) FROM source_table GROUP BY a;
|
||||||
|
RESET citus.log_remote_commands; RESET client_min_messages;
|
||||||
|
DROP TABLE results;
|
||||||
|
|
||||||
|
-- now verify that we don't write the extra columns to the intermediate result files and
|
||||||
|
-- insertion to the target works fine.
|
||||||
|
SET client_min_messages TO DEBUG2;
|
||||||
|
INSERT INTO target_table SELECT max(-a), array_agg(mapped_key) FROM source_table GROUP BY a;
|
||||||
|
RESET client_min_messages;
|
||||||
|
|
||||||
|
SELECT * FROM target_table ORDER BY a;
|
||||||
|
|
||||||
DROP TABLE source_table, target_table;
|
DROP TABLE source_table, target_table;
|
||||||
|
|
||||||
SET client_min_messages TO WARNING;
|
SET client_min_messages TO WARNING;
|
||||||
|
|
Loading…
Reference in New Issue