mirror of https://github.com/citusdata/citus.git
Repartitioned INSERT/SELECT: Enable RETURNING
parent
4b14347fc3
commit
494cc383cc
|
@ -75,7 +75,7 @@ static List * RedistributedInsertSelectTaskList(Query *insertSelectQuery,
|
|||
List **redistributedResults,
|
||||
bool useBinaryFormat);
|
||||
static int PartitionColumnIndex(List *insertTargetList, Var *partitionColumn);
|
||||
static bool IsRedistributablePlan(Plan *selectPlan, bool hasReturning);
|
||||
static bool IsRedistributablePlan(Plan *selectPlan);
|
||||
static Expr * CastExpr(Expr *expr, Oid sourceType, Oid targetType, Oid targetCollation,
|
||||
int targetTypeMod);
|
||||
static void WrapTaskListForProjection(List *taskList, List *projectedTargetEntries);
|
||||
|
@ -180,7 +180,7 @@ CoordinatorInsertSelectExecScanInternal(CustomScanState *node)
|
|||
LockPartitionRelations(targetRelationId, RowExclusiveLock);
|
||||
}
|
||||
|
||||
if (IsRedistributablePlan(selectPlan->planTree, hasReturning) &&
|
||||
if (IsRedistributablePlan(selectPlan->planTree) &&
|
||||
IsSupportedRedistributionTarget(targetRelationId))
|
||||
{
|
||||
ereport(DEBUG1, (errmsg("performing repartitioned INSERT ... SELECT")));
|
||||
|
@ -1017,13 +1017,8 @@ PartitionColumnIndex(List *insertTargetList, Var *partitionColumn)
|
|||
* IsRedistributablePlan returns true if the given plan is a redistrituable plan.
|
||||
*/
|
||||
static bool
|
||||
IsRedistributablePlan(Plan *selectPlan, bool hasReturning)
|
||||
IsRedistributablePlan(Plan *selectPlan)
|
||||
{
|
||||
if (hasReturning)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
/* don't redistribute if query is not distributed or requires merge on coordinator */
|
||||
if (!IsCitusCustomScan(selectPlan))
|
||||
{
|
||||
|
|
|
@ -478,5 +478,35 @@ SELECT * FROM target_table ORDER BY a;
|
|||
-1 | {1,2,3}
|
||||
(12 rows)
|
||||
|
||||
--
|
||||
-- repartitioned INSERT/SELECT with RETURNING
|
||||
--
|
||||
TRUNCATE target_table;
|
||||
SET client_min_messages TO DEBUG2;
|
||||
WITH c AS (
|
||||
INSERT INTO target_table
|
||||
SELECT mapped_key, c FROM source_table
|
||||
RETURNING *)
|
||||
SELECT * FROM c ORDER by a;
|
||||
DEBUG: data-modifying statements are not supported in the WITH clauses of distributed queries
|
||||
DEBUG: generating subplan XXX_1 for CTE c: INSERT INTO insert_select_repartition.target_table (a, b) SELECT mapped_key, c FROM insert_select_repartition.source_table RETURNING target_table.a, target_table.b
|
||||
DEBUG: cannot perform distributed INSERT INTO ... SELECT because the partition columns in the source table and subquery do not match
|
||||
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT a, b FROM (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer[])) c ORDER BY a
|
||||
DEBUG: Creating router plan
|
||||
DEBUG: Plan is router executable
|
||||
DEBUG: Router planner cannot handle multi-shard select queries
|
||||
DEBUG: performing repartitioned INSERT ... SELECT
|
||||
DEBUG: partitioning SELECT query by column index 0 with name 'mapped_key'
|
||||
DEBUG: distributed statement: INSERT INTO insert_select_repartition.target_table_4213604 AS citus_table_alias (a, b) SELECT mapped_key, auto_coerced_by_citus_1 FROM read_intermediate_results('{repartitioned_results_from_4213602_to_0,repartitioned_results_from_4213603_to_0}'::text[], 'binary'::citus_copy_format) intermediate_result(mapped_key integer, auto_coerced_by_citus_1 integer[]) RETURNING citus_table_alias.a, citus_table_alias.b
|
||||
DEBUG: distributed statement: INSERT INTO insert_select_repartition.target_table_4213605 AS citus_table_alias (a, b) SELECT mapped_key, auto_coerced_by_citus_1 FROM read_intermediate_results('{repartitioned_results_from_4213601_to_1}'::text[], 'binary'::citus_copy_format) intermediate_result(mapped_key integer, auto_coerced_by_citus_1 integer[]) RETURNING citus_table_alias.a, citus_table_alias.b
|
||||
a | b
|
||||
---------------------------------------------------------------------
|
||||
-4 | {3}
|
||||
-3 | {}
|
||||
-2 | {4,6}
|
||||
-1 | {1,2,3}
|
||||
(4 rows)
|
||||
|
||||
RESET client_min_messages;
|
||||
SET client_min_messages TO WARNING;
|
||||
DROP SCHEMA insert_select_repartition CASCADE;
|
||||
|
|
|
@ -70,7 +70,7 @@ DEBUG: generating subplan XXX_1 for CTE inserted_table: INSERT INTO on_conflict
|
|||
DEBUG: cannot perform distributed INSERT INTO ... SELECT because the partition columns in the source table and subquery do not match
|
||||
DETAIL: The target table's partition column should correspond to a partition column in the subquery.
|
||||
DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT col_1, col_2 FROM (SELECT intermediate_result.col_1, intermediate_result.col_2 FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(col_1 integer, col_2 integer)) inserted_table ORDER BY col_1
|
||||
DEBUG: Collecting INSERT ... SELECT results on coordinator
|
||||
DEBUG: performing repartitioned INSERT ... SELECT
|
||||
col_1 | col_2
|
||||
---------------------------------------------------------------------
|
||||
1 | 1
|
||||
|
@ -324,11 +324,11 @@ BEGIN;
|
|||
FROM source_table_1 ON CONFLICT (col_1) DO UPDATE SET col_2 = 1 RETURNING *;
|
||||
col_1 | col_2
|
||||
---------------------------------------------------------------------
|
||||
1 | 1
|
||||
2 | 1
|
||||
3 | 1
|
||||
4 | 1
|
||||
1 | 1
|
||||
5 | 1
|
||||
2 | 1
|
||||
(5 rows)
|
||||
|
||||
ROLLBACK;
|
||||
|
|
|
@ -217,5 +217,17 @@ INSERT INTO target_table SELECT mapped_key, c FROM source_table;
|
|||
END;
|
||||
SELECT * FROM target_table ORDER BY a;
|
||||
|
||||
--
|
||||
-- repartitioned INSERT/SELECT with RETURNING
|
||||
--
|
||||
TRUNCATE target_table;
|
||||
SET client_min_messages TO DEBUG2;
|
||||
WITH c AS (
|
||||
INSERT INTO target_table
|
||||
SELECT mapped_key, c FROM source_table
|
||||
RETURNING *)
|
||||
SELECT * FROM c ORDER by a;
|
||||
RESET client_min_messages;
|
||||
|
||||
SET client_min_messages TO WARNING;
|
||||
DROP SCHEMA insert_select_repartition CASCADE;
|
||||
|
|
Loading…
Reference in New Issue