mirror of https://github.com/citusdata/citus.git
INSERT/SELECT: show method in EXPLAIN output
parent
b143d9588a
commit
97072c9eb1
|
@ -62,7 +62,7 @@ CustomScanMethods TaskTrackerCustomScanMethods = {
|
||||||
};
|
};
|
||||||
|
|
||||||
CustomScanMethods CoordinatorInsertSelectCustomScanMethods = {
|
CustomScanMethods CoordinatorInsertSelectCustomScanMethods = {
|
||||||
"Citus INSERT ... SELECT via coordinator",
|
"Citus INSERT ... SELECT",
|
||||||
CoordinatorInsertSelectCreateScan
|
CoordinatorInsertSelectCreateScan
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
|
@ -69,13 +69,11 @@ static List * BuildColumnNameListFromTargetList(Oid targetRelationId,
|
||||||
static int PartitionColumnIndexFromColumnList(Oid relationId, List *columnNameList);
|
static int PartitionColumnIndexFromColumnList(Oid relationId, List *columnNameList);
|
||||||
static List * AddInsertSelectCasts(List *insertTargetList, List *selectTargetList,
|
static List * AddInsertSelectCasts(List *insertTargetList, List *selectTargetList,
|
||||||
Oid targetRelationId);
|
Oid targetRelationId);
|
||||||
static bool IsSupportedRedistributionTarget(Oid targetRelationId);
|
|
||||||
static List * RedistributedInsertSelectTaskList(Query *insertSelectQuery,
|
static List * RedistributedInsertSelectTaskList(Query *insertSelectQuery,
|
||||||
DistTableCacheEntry *targetRelation,
|
DistTableCacheEntry *targetRelation,
|
||||||
List **redistributedResults,
|
List **redistributedResults,
|
||||||
bool useBinaryFormat);
|
bool useBinaryFormat);
|
||||||
static int PartitionColumnIndex(List *insertTargetList, Var *partitionColumn);
|
static int PartitionColumnIndex(List *insertTargetList, Var *partitionColumn);
|
||||||
static bool IsRedistributablePlan(Plan *selectPlan);
|
|
||||||
static Expr * CastExpr(Expr *expr, Oid sourceType, Oid targetType, Oid targetCollation,
|
static Expr * CastExpr(Expr *expr, Oid sourceType, Oid targetType, Oid targetCollation,
|
||||||
int targetTypeMod);
|
int targetTypeMod);
|
||||||
static void WrapTaskListForProjection(List *taskList, List *projectedTargetEntries);
|
static void WrapTaskListForProjection(List *taskList, List *projectedTargetEntries);
|
||||||
|
@ -873,7 +871,7 @@ CastExpr(Expr *expr, Oid sourceType, Oid targetType, Oid targetCollation,
|
||||||
* IsSupportedRedistributionTarget determines whether re-partitioning into the
|
* IsSupportedRedistributionTarget determines whether re-partitioning into the
|
||||||
* given target relation is supported.
|
* given target relation is supported.
|
||||||
*/
|
*/
|
||||||
static bool
|
bool
|
||||||
IsSupportedRedistributionTarget(Oid targetRelationId)
|
IsSupportedRedistributionTarget(Oid targetRelationId)
|
||||||
{
|
{
|
||||||
DistTableCacheEntry *tableEntry = DistributedTableCacheEntry(targetRelationId);
|
DistTableCacheEntry *tableEntry = DistributedTableCacheEntry(targetRelationId);
|
||||||
|
@ -966,7 +964,7 @@ RedistributedInsertSelectTaskList(Query *insertSelectQuery,
|
||||||
ereport(DEBUG2, (errmsg("distributed statement: %s", queryString->data)));
|
ereport(DEBUG2, (errmsg("distributed statement: %s", queryString->data)));
|
||||||
|
|
||||||
LockShardDistributionMetadata(shardId, ShareLock);
|
LockShardDistributionMetadata(shardId, ShareLock);
|
||||||
List *insertShardPlacementList = FinalizedShardPlacementList(shardId);
|
List *insertShardPlacementList = ActiveShardPlacementList(shardId);
|
||||||
|
|
||||||
RelationShard *relationShard = CitusMakeNode(RelationShard);
|
RelationShard *relationShard = CitusMakeNode(RelationShard);
|
||||||
relationShard->relationId = targetShardInterval->relationId;
|
relationShard->relationId = targetShardInterval->relationId;
|
||||||
|
@ -1015,7 +1013,7 @@ PartitionColumnIndex(List *insertTargetList, Var *partitionColumn)
|
||||||
/*
|
/*
|
||||||
* IsRedistributablePlan returns true if the given plan is a redistrituable plan.
|
* IsRedistributablePlan returns true if the given plan is a redistrituable plan.
|
||||||
*/
|
*/
|
||||||
static bool
|
bool
|
||||||
IsRedistributablePlan(Plan *selectPlan)
|
IsRedistributablePlan(Plan *selectPlan)
|
||||||
{
|
{
|
||||||
/* don't redistribute if query is not distributed or requires merge on coordinator */
|
/* don't redistribute if query is not distributed or requires merge on coordinator */
|
||||||
|
|
|
@ -32,6 +32,7 @@
|
||||||
#include "distributed/multi_logical_planner.h"
|
#include "distributed/multi_logical_planner.h"
|
||||||
#include "distributed/multi_master_planner.h"
|
#include "distributed/multi_master_planner.h"
|
||||||
#include "distributed/multi_physical_planner.h"
|
#include "distributed/multi_physical_planner.h"
|
||||||
|
#include "distributed/multi_router_planner.h"
|
||||||
#include "distributed/distributed_planner.h"
|
#include "distributed/distributed_planner.h"
|
||||||
#include "distributed/multi_server_executor.h"
|
#include "distributed/multi_server_executor.h"
|
||||||
#include "distributed/remote_commands.h"
|
#include "distributed/remote_commands.h"
|
||||||
|
@ -139,9 +140,12 @@ CoordinatorInsertSelectExplainScan(CustomScanState *node, List *ancestors,
|
||||||
DistributedPlan *distributedPlan = scanState->distributedPlan;
|
DistributedPlan *distributedPlan = scanState->distributedPlan;
|
||||||
Query *insertSelectQuery = distributedPlan->insertSelectQuery;
|
Query *insertSelectQuery = distributedPlan->insertSelectQuery;
|
||||||
Query *query = BuildSelectForInsertSelect(insertSelectQuery);
|
Query *query = BuildSelectForInsertSelect(insertSelectQuery);
|
||||||
|
RangeTblEntry *insertRte = ExtractResultRelationRTE(insertSelectQuery);
|
||||||
|
Oid targetRelationId = insertRte->relid;
|
||||||
IntoClause *into = NULL;
|
IntoClause *into = NULL;
|
||||||
ParamListInfo params = NULL;
|
ParamListInfo params = NULL;
|
||||||
char *queryString = NULL;
|
char *queryString = NULL;
|
||||||
|
int cursorOptions = CURSOR_OPT_PARALLEL_OK;
|
||||||
|
|
||||||
if (es->analyze)
|
if (es->analyze)
|
||||||
{
|
{
|
||||||
|
@ -150,6 +154,17 @@ CoordinatorInsertSelectExplainScan(CustomScanState *node, List *ancestors,
|
||||||
"... SELECT commands via the coordinator")));
|
"... SELECT commands via the coordinator")));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
PlannedStmt *selectPlan = pg_plan_query(query, cursorOptions, params);
|
||||||
|
if (IsRedistributablePlan(selectPlan->planTree) &&
|
||||||
|
IsSupportedRedistributionTarget(targetRelationId))
|
||||||
|
{
|
||||||
|
ExplainPropertyText("INSERT/SELECT method", "repartition", es);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
ExplainPropertyText("INSERT/SELECT method", "pull to coordinator", es);
|
||||||
|
}
|
||||||
|
|
||||||
ExplainOpenGroup("Select Query", "Select Query", false, es);
|
ExplainOpenGroup("Select Query", "Select Query", false, es);
|
||||||
|
|
||||||
/* explain the inner SELECT query */
|
/* explain the inner SELECT query */
|
||||||
|
|
|
@ -20,6 +20,8 @@
|
||||||
extern TupleTableSlot * CoordinatorInsertSelectExecScan(CustomScanState *node);
|
extern TupleTableSlot * CoordinatorInsertSelectExecScan(CustomScanState *node);
|
||||||
extern bool ExecutingInsertSelect(void);
|
extern bool ExecutingInsertSelect(void);
|
||||||
extern Query * BuildSelectForInsertSelect(Query *insertSelectQuery);
|
extern Query * BuildSelectForInsertSelect(Query *insertSelectQuery);
|
||||||
|
extern bool IsSupportedRedistributionTarget(Oid targetRelationId);
|
||||||
|
extern bool IsRedistributablePlan(Plan *selectPlan);
|
||||||
|
|
||||||
|
|
||||||
#endif /* INSERT_SELECT_EXECUTOR_H */
|
#endif /* INSERT_SELECT_EXECUTOR_H */
|
||||||
|
|
|
@ -275,9 +275,9 @@ SET citus.log_remote_commands TO true; SET client_min_messages TO DEBUG;
|
||||||
CREATE TABLE results AS SELECT max(-a), array_agg(mapped_key) FROM source_table GROUP BY a;
|
CREATE TABLE results AS SELECT max(-a), array_agg(mapped_key) FROM source_table GROUP BY a;
|
||||||
DEBUG: Router planner cannot handle multi-shard select queries
|
DEBUG: Router planner cannot handle multi-shard select queries
|
||||||
DEBUG: building index "pg_toast_xxxxx_index" on table "pg_toast_xxxxx" serially
|
DEBUG: building index "pg_toast_xxxxx_index" on table "pg_toast_xxxxx" serially
|
||||||
LOG: issuing SELECT max((OPERATOR(pg_catalog.-) a)) AS max, array_agg(mapped_key) AS array_agg, a AS worker_column_3 FROM insert_select_repartition.source_table_4213601 source_table WHERE true GROUP BY a
|
NOTICE: issuing SELECT max((OPERATOR(pg_catalog.-) a)) AS max, array_agg(mapped_key) AS array_agg, a AS worker_column_3 FROM insert_select_repartition.source_table_4213601 source_table WHERE true GROUP BY a
|
||||||
LOG: issuing SELECT max((OPERATOR(pg_catalog.-) a)) AS max, array_agg(mapped_key) AS array_agg, a AS worker_column_3 FROM insert_select_repartition.source_table_4213602 source_table WHERE true GROUP BY a
|
NOTICE: issuing SELECT max((OPERATOR(pg_catalog.-) a)) AS max, array_agg(mapped_key) AS array_agg, a AS worker_column_3 FROM insert_select_repartition.source_table_4213602 source_table WHERE true GROUP BY a
|
||||||
LOG: issuing SELECT max((OPERATOR(pg_catalog.-) a)) AS max, array_agg(mapped_key) AS array_agg, a AS worker_column_3 FROM insert_select_repartition.source_table_4213603 source_table WHERE true GROUP BY a
|
NOTICE: issuing SELECT max((OPERATOR(pg_catalog.-) a)) AS max, array_agg(mapped_key) AS array_agg, a AS worker_column_3 FROM insert_select_repartition.source_table_4213603 source_table WHERE true GROUP BY a
|
||||||
RESET citus.log_remote_commands; RESET client_min_messages;
|
RESET citus.log_remote_commands; RESET client_min_messages;
|
||||||
DROP TABLE results;
|
DROP TABLE results;
|
||||||
-- now verify that we don't write the extra columns to the intermediate result files and
|
-- now verify that we don't write the extra columns to the intermediate result files and
|
||||||
|
@ -555,6 +555,24 @@ SELECT * FROM target_table ORDER BY a;
|
||||||
5 | 400
|
5 | 400
|
||||||
(6 rows)
|
(6 rows)
|
||||||
|
|
||||||
|
--
|
||||||
|
-- EXPLAIN output should specify repartitioned INSERT/SELECT
|
||||||
|
--
|
||||||
|
EXPLAIN INSERT INTO target_table SELECT a, max(b) FROM source_table GROUP BY a;
|
||||||
|
QUERY PLAN
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
Custom Scan (Citus INSERT ... SELECT) (cost=0.00..0.00 rows=0 width=0)
|
||||||
|
INSERT/SELECT method: repartition
|
||||||
|
-> Custom Scan (Citus Adaptive) (cost=0.00..0.00 rows=0 width=0)
|
||||||
|
Task Count: 4
|
||||||
|
Tasks Shown: One of 4
|
||||||
|
-> Task
|
||||||
|
Node: host=localhost port=xxxxx dbname=regression
|
||||||
|
-> HashAggregate (cost=43.90..45.90 rows=200 width=8)
|
||||||
|
Group Key: a
|
||||||
|
-> Seq Scan on source_table_4213606 source_table (cost=0.00..32.60 rows=2260 width=8)
|
||||||
|
(10 rows)
|
||||||
|
|
||||||
DROP TABLE source_table, target_table;
|
DROP TABLE source_table, target_table;
|
||||||
SET client_min_messages TO WARNING;
|
SET client_min_messages TO WARNING;
|
||||||
DROP SCHEMA insert_select_repartition CASCADE;
|
DROP SCHEMA insert_select_repartition CASCADE;
|
||||||
|
|
|
@ -1188,7 +1188,8 @@ ROLLBACK;
|
||||||
EXPLAIN (COSTS OFF)
|
EXPLAIN (COSTS OFF)
|
||||||
INSERT INTO lineitem_hash_part
|
INSERT INTO lineitem_hash_part
|
||||||
SELECT o_orderkey FROM orders_hash_part LIMIT 3;
|
SELECT o_orderkey FROM orders_hash_part LIMIT 3;
|
||||||
Custom Scan (Citus INSERT ... SELECT via coordinator)
|
Custom Scan (Citus INSERT ... SELECT)
|
||||||
|
INSERT/SELECT method: pull to coordinator
|
||||||
-> Limit
|
-> Limit
|
||||||
-> Custom Scan (Citus Adaptive)
|
-> Custom Scan (Citus Adaptive)
|
||||||
Task Count: 4
|
Task Count: 4
|
||||||
|
@ -1205,7 +1206,8 @@ t
|
||||||
EXPLAIN (COSTS OFF)
|
EXPLAIN (COSTS OFF)
|
||||||
INSERT INTO lineitem_hash_part (l_orderkey, l_quantity)
|
INSERT INTO lineitem_hash_part (l_orderkey, l_quantity)
|
||||||
SELECT o_orderkey, 5 FROM orders_hash_part LIMIT 3;
|
SELECT o_orderkey, 5 FROM orders_hash_part LIMIT 3;
|
||||||
Custom Scan (Citus INSERT ... SELECT via coordinator)
|
Custom Scan (Citus INSERT ... SELECT)
|
||||||
|
INSERT/SELECT method: pull to coordinator
|
||||||
-> Limit
|
-> Limit
|
||||||
-> Custom Scan (Citus Adaptive)
|
-> Custom Scan (Citus Adaptive)
|
||||||
Task Count: 4
|
Task Count: 4
|
||||||
|
@ -1217,7 +1219,8 @@ Custom Scan (Citus INSERT ... SELECT via coordinator)
|
||||||
EXPLAIN (COSTS OFF)
|
EXPLAIN (COSTS OFF)
|
||||||
INSERT INTO lineitem_hash_part (l_orderkey)
|
INSERT INTO lineitem_hash_part (l_orderkey)
|
||||||
SELECT s FROM generate_series(1,5) s;
|
SELECT s FROM generate_series(1,5) s;
|
||||||
Custom Scan (Citus INSERT ... SELECT via coordinator)
|
Custom Scan (Citus INSERT ... SELECT)
|
||||||
|
INSERT/SELECT method: pull to coordinator
|
||||||
-> Function Scan on generate_series s
|
-> Function Scan on generate_series s
|
||||||
-- WHERE EXISTS forces pg12 to materialize cte
|
-- WHERE EXISTS forces pg12 to materialize cte
|
||||||
EXPLAIN (COSTS OFF)
|
EXPLAIN (COSTS OFF)
|
||||||
|
@ -1225,7 +1228,8 @@ WITH cte1 AS (SELECT s FROM generate_series(1,10) s)
|
||||||
INSERT INTO lineitem_hash_part
|
INSERT INTO lineitem_hash_part
|
||||||
WITH cte1 AS (SELECT * FROM cte1 WHERE EXISTS (SELECT * FROM cte1) LIMIT 5)
|
WITH cte1 AS (SELECT * FROM cte1 WHERE EXISTS (SELECT * FROM cte1) LIMIT 5)
|
||||||
SELECT s FROM cte1 WHERE EXISTS (SELECT * FROM cte1);
|
SELECT s FROM cte1 WHERE EXISTS (SELECT * FROM cte1);
|
||||||
Custom Scan (Citus INSERT ... SELECT via coordinator)
|
Custom Scan (Citus INSERT ... SELECT)
|
||||||
|
INSERT/SELECT method: pull to coordinator
|
||||||
-> Result
|
-> Result
|
||||||
One-Time Filter: $3
|
One-Time Filter: $3
|
||||||
CTE cte1
|
CTE cte1
|
||||||
|
@ -1244,7 +1248,8 @@ EXPLAIN (COSTS OFF)
|
||||||
INSERT INTO lineitem_hash_part
|
INSERT INTO lineitem_hash_part
|
||||||
( SELECT s FROM generate_series(1,5) s) UNION
|
( SELECT s FROM generate_series(1,5) s) UNION
|
||||||
( SELECT s FROM generate_series(5,10) s);
|
( SELECT s FROM generate_series(5,10) s);
|
||||||
Custom Scan (Citus INSERT ... SELECT via coordinator)
|
Custom Scan (Citus INSERT ... SELECT)
|
||||||
|
INSERT/SELECT method: pull to coordinator
|
||||||
-> HashAggregate
|
-> HashAggregate
|
||||||
Group Key: s.s
|
Group Key: s.s
|
||||||
-> Append
|
-> Append
|
||||||
|
|
|
@ -265,6 +265,13 @@ RESET client_min_messages;
|
||||||
|
|
||||||
SELECT * FROM target_table ORDER BY a;
|
SELECT * FROM target_table ORDER BY a;
|
||||||
|
|
||||||
|
--
|
||||||
|
-- EXPLAIN output should specify repartitioned INSERT/SELECT
|
||||||
|
--
|
||||||
|
|
||||||
|
EXPLAIN INSERT INTO target_table SELECT a, max(b) FROM source_table GROUP BY a;
|
||||||
|
|
||||||
|
|
||||||
DROP TABLE source_table, target_table;
|
DROP TABLE source_table, target_table;
|
||||||
|
|
||||||
SET client_min_messages TO WARNING;
|
SET client_min_messages TO WARNING;
|
||||||
|
|
Loading…
Reference in New Issue