mirror of https://github.com/citusdata/citus.git
CTE statistics in EXPLAIN ANALYZE
parent
fe8a9c721c
commit
7c52c6edb0
|
@ -76,8 +76,9 @@ typedef struct RemoteFileDestReceiver
|
||||||
CopyOutState copyOutState;
|
CopyOutState copyOutState;
|
||||||
FmgrInfo *columnOutputFunctions;
|
FmgrInfo *columnOutputFunctions;
|
||||||
|
|
||||||
/* number of tuples sent */
|
/* statistics */
|
||||||
uint64 tuplesSent;
|
uint64 tuplesSent;
|
||||||
|
uint64 bytesSent;
|
||||||
} RemoteFileDestReceiver;
|
} RemoteFileDestReceiver;
|
||||||
|
|
||||||
|
|
||||||
|
@ -224,6 +225,17 @@ CreateRemoteFileDestReceiver(const char *resultId, EState *executorState,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* RemoteFileDestReceiverBytesSent returns number of bytes sent per remote worker.
|
||||||
|
*/
|
||||||
|
uint64
|
||||||
|
RemoteFileDestReceiverBytesSent(DestReceiver *destReceiver)
|
||||||
|
{
|
||||||
|
RemoteFileDestReceiver *remoteDestReceiver = (RemoteFileDestReceiver *) destReceiver;
|
||||||
|
return remoteDestReceiver->bytesSent;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* RemoteFileDestReceiverStartup implements the rStartup interface of
|
* RemoteFileDestReceiverStartup implements the rStartup interface of
|
||||||
* RemoteFileDestReceiver. It opens connections to the nodes in initialNodeList,
|
* RemoteFileDestReceiver. It opens connections to the nodes in initialNodeList,
|
||||||
|
@ -415,6 +427,7 @@ RemoteFileDestReceiverReceive(TupleTableSlot *slot, DestReceiver *dest)
|
||||||
MemoryContextSwitchTo(oldContext);
|
MemoryContextSwitchTo(oldContext);
|
||||||
|
|
||||||
resultDest->tuplesSent++;
|
resultDest->tuplesSent++;
|
||||||
|
resultDest->bytesSent += copyData->len;
|
||||||
|
|
||||||
ResetPerTupleExprContext(executorState);
|
ResetPerTupleExprContext(executorState);
|
||||||
|
|
||||||
|
|
|
@ -20,6 +20,7 @@
|
||||||
#include "distributed/transaction_management.h"
|
#include "distributed/transaction_management.h"
|
||||||
#include "distributed/worker_manager.h"
|
#include "distributed/worker_manager.h"
|
||||||
#include "executor/executor.h"
|
#include "executor/executor.h"
|
||||||
|
#include "utils/datetime.h"
|
||||||
|
|
||||||
|
|
||||||
int MaxIntermediateResult = 1048576; /* maximum size in KB the intermediate result can grow to */
|
int MaxIntermediateResult = 1048576; /* maximum size in KB the intermediate result can grow to */
|
||||||
|
@ -73,8 +74,24 @@ ExecuteSubPlans(DistributedPlan *distributedPlan)
|
||||||
CreateRemoteFileDestReceiver(resultId, estate, remoteWorkerNodeList,
|
CreateRemoteFileDestReceiver(resultId, estate, remoteWorkerNodeList,
|
||||||
entry->writeLocalFile);
|
entry->writeLocalFile);
|
||||||
|
|
||||||
|
TimestampTz startTimestamp = GetCurrentTimestamp();
|
||||||
|
|
||||||
ExecutePlanIntoDestReceiver(plannedStmt, params, copyDest);
|
ExecutePlanIntoDestReceiver(plannedStmt, params, copyDest);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* EXPLAIN ANALYZE instrumentations. Calculating these are very light-weight,
|
||||||
|
* so always populate them regardless of EXPLAIN ANALYZE or not.
|
||||||
|
*/
|
||||||
|
long durationSeconds = 0.0;
|
||||||
|
int durationMicrosecs = 0;
|
||||||
|
TimestampDifference(startTimestamp, GetCurrentTimestamp(), &durationSeconds,
|
||||||
|
&durationMicrosecs);
|
||||||
|
subPlan->durationMillisecs = durationSeconds * 1000 * +durationMicrosecs * 10e-3;
|
||||||
|
|
||||||
|
subPlan->bytesSentPerWorker = RemoteFileDestReceiverBytesSent(copyDest);
|
||||||
|
subPlan->remoteWorkerCount = list_length(remoteWorkerNodeList);
|
||||||
|
subPlan->writeLocalFile = entry->writeLocalFile;
|
||||||
|
|
||||||
SubPlanLevel--;
|
SubPlanLevel--;
|
||||||
FreeExecutorState(estate);
|
FreeExecutorState(estate);
|
||||||
}
|
}
|
||||||
|
|
|
@ -277,6 +277,36 @@ ExplainSubPlans(DistributedPlan *distributedPlan, ExplainState *es)
|
||||||
es->indent += 3;
|
es->indent += 3;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (es->analyze)
|
||||||
|
{
|
||||||
|
if (es->timing)
|
||||||
|
{
|
||||||
|
ExplainPropertyFloat("Subplan Duration", "ms", subPlan->durationMillisecs,
|
||||||
|
2, es);
|
||||||
|
}
|
||||||
|
|
||||||
|
ExplainPropertyInteger("Intermediate Data Size", "bytes",
|
||||||
|
subPlan->bytesSentPerWorker, es);
|
||||||
|
|
||||||
|
StringInfo destination = makeStringInfo();
|
||||||
|
if (subPlan->remoteWorkerCount && subPlan->writeLocalFile)
|
||||||
|
{
|
||||||
|
appendStringInfo(destination, "Send to %d nodes, write locally",
|
||||||
|
subPlan->remoteWorkerCount);
|
||||||
|
}
|
||||||
|
else if (subPlan->writeLocalFile)
|
||||||
|
{
|
||||||
|
appendStringInfoString(destination, "Write locally");
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
appendStringInfo(destination, "Send to %d nodes",
|
||||||
|
subPlan->remoteWorkerCount);
|
||||||
|
}
|
||||||
|
|
||||||
|
ExplainPropertyText("Result destination", destination->data, es);
|
||||||
|
}
|
||||||
|
|
||||||
INSTR_TIME_SET_ZERO(planduration);
|
INSTR_TIME_SET_ZERO(planduration);
|
||||||
|
|
||||||
ExplainOnePlan(plan, into, es, queryString, params, NULL, &planduration);
|
ExplainOnePlan(plan, into, es, queryString, params, NULL, &planduration);
|
||||||
|
|
|
@ -53,6 +53,7 @@ extern DestReceiver * CreateRemoteFileDestReceiver(const char *resultId,
|
||||||
EState *executorState,
|
EState *executorState,
|
||||||
List *initialNodeList, bool
|
List *initialNodeList, bool
|
||||||
writeLocalFile);
|
writeLocalFile);
|
||||||
|
extern uint64 RemoteFileDestReceiverBytesSent(DestReceiver *destReceiver);
|
||||||
extern void SendQueryResultViaCopy(const char *resultId);
|
extern void SendQueryResultViaCopy(const char *resultId);
|
||||||
extern void ReceiveQueryResultViaCopy(const char *resultId);
|
extern void ReceiveQueryResultViaCopy(const char *resultId);
|
||||||
extern void RemoveIntermediateResultsDirectory(void);
|
extern void RemoveIntermediateResultsDirectory(void);
|
||||||
|
|
|
@ -466,6 +466,12 @@ typedef struct DistributedSubPlan
|
||||||
|
|
||||||
uint32 subPlanId;
|
uint32 subPlanId;
|
||||||
PlannedStmt *plan;
|
PlannedStmt *plan;
|
||||||
|
|
||||||
|
/* EXPLAIN ANALYZE instrumentations */
|
||||||
|
uint64 bytesSentPerWorker;
|
||||||
|
uint32 remoteWorkerCount;
|
||||||
|
double durationMillisecs;
|
||||||
|
bool writeLocalFile;
|
||||||
} DistributedSubPlan;
|
} DistributedSubPlan;
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -304,6 +304,29 @@ EXPLAIN (ANALYZE, COSTS OFF, SUMMARY OFF, TIMING OFF) SELECT * FROM distribute
|
||||||
Filter: (age = 20)
|
Filter: (age = 20)
|
||||||
(8 rows)
|
(8 rows)
|
||||||
|
|
||||||
|
EXPLAIN (ANALYZE ON, COSTS OFF, SUMMARY OFF, TIMING OFF)
|
||||||
|
WITH r AS ( SELECT random() z,* FROM distributed_table)
|
||||||
|
SELECT * FROM r WHERE z < 2;
|
||||||
|
QUERY PLAN
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
Custom Scan (Citus Adaptive) (actual rows=1 loops=1)
|
||||||
|
-> Distributed Subplan XXX_1
|
||||||
|
Intermediate Data Size: 40 bytes
|
||||||
|
Result destination: Write locally
|
||||||
|
-> Custom Scan (Citus Adaptive) (actual rows=1 loops=1)
|
||||||
|
Task Count: 4
|
||||||
|
Tasks Shown: One of 4
|
||||||
|
-> Task
|
||||||
|
Node: host=localhost port=xxxxx dbname=regression
|
||||||
|
-> Seq Scan on distributed_table_1470001 distributed_table (actual rows=1 loops=1)
|
||||||
|
Task Count: 1
|
||||||
|
Tasks Shown: All
|
||||||
|
-> Task
|
||||||
|
Node: host=localhost port=xxxxx dbname=regression
|
||||||
|
-> Function Scan on read_intermediate_result intermediate_result (actual rows=1 loops=1)
|
||||||
|
Filter: (z < '2'::double precision)
|
||||||
|
(16 rows)
|
||||||
|
|
||||||
EXPLAIN (COSTS OFF) DELETE FROM distributed_table WHERE key = 1 AND age = 20;
|
EXPLAIN (COSTS OFF) DELETE FROM distributed_table WHERE key = 1 AND age = 20;
|
||||||
QUERY PLAN
|
QUERY PLAN
|
||||||
---------------------------------------------------------------------
|
---------------------------------------------------------------------
|
||||||
|
|
|
@ -2019,6 +2019,8 @@ WITH r AS (
|
||||||
SELECT count(distinct a) from r NATURAL JOIN ref_table;
|
SELECT count(distinct a) from r NATURAL JOIN ref_table;
|
||||||
Custom Scan (Citus Adaptive) (actual rows=1 loops=1)
|
Custom Scan (Citus Adaptive) (actual rows=1 loops=1)
|
||||||
-> Distributed Subplan XXX_1
|
-> Distributed Subplan XXX_1
|
||||||
|
Intermediate Data Size: 220 bytes
|
||||||
|
Result destination: Send to 2 nodes
|
||||||
-> Custom Scan (Citus Adaptive) (actual rows=10 loops=1)
|
-> Custom Scan (Citus Adaptive) (actual rows=10 loops=1)
|
||||||
Task Count: 4
|
Task Count: 4
|
||||||
Tasks Shown: One of 4
|
Tasks Shown: One of 4
|
||||||
|
@ -2063,6 +2065,8 @@ WHERE EXISTS(SELECT random() FROM dist_table NATURAL JOIN ref_table);
|
||||||
Aggregate (actual rows=1 loops=1)
|
Aggregate (actual rows=1 loops=1)
|
||||||
-> Custom Scan (Citus Adaptive) (actual rows=10 loops=1)
|
-> Custom Scan (Citus Adaptive) (actual rows=10 loops=1)
|
||||||
-> Distributed Subplan XXX_1
|
-> Distributed Subplan XXX_1
|
||||||
|
Intermediate Data Size: 140 bytes
|
||||||
|
Result destination: Send to 2 nodes
|
||||||
-> Custom Scan (Citus Adaptive) (actual rows=10 loops=1)
|
-> Custom Scan (Citus Adaptive) (actual rows=10 loops=1)
|
||||||
Task Count: 4
|
Task Count: 4
|
||||||
Tasks Shown: One of 4
|
Tasks Shown: One of 4
|
||||||
|
@ -2100,6 +2104,8 @@ WITH r AS (
|
||||||
SELECT count(distinct a2) FROM s;
|
SELECT count(distinct a2) FROM s;
|
||||||
Custom Scan (Citus Adaptive) (actual rows=1 loops=1)
|
Custom Scan (Citus Adaptive) (actual rows=1 loops=1)
|
||||||
-> Distributed Subplan XXX_1
|
-> Distributed Subplan XXX_1
|
||||||
|
Intermediate Data Size: 100 bytes
|
||||||
|
Result destination: Write locally
|
||||||
-> Custom Scan (Citus Adaptive) (actual rows=20 loops=1)
|
-> Custom Scan (Citus Adaptive) (actual rows=20 loops=1)
|
||||||
Task Count: 4
|
Task Count: 4
|
||||||
Tasks Shown: One of 4
|
Tasks Shown: One of 4
|
||||||
|
@ -2109,6 +2115,8 @@ Custom Scan (Citus Adaptive) (actual rows=1 loops=1)
|
||||||
-> Seq Scan on dist_table_570017 dist_table (actual rows=8 loops=1)
|
-> Seq Scan on dist_table_570017 dist_table (actual rows=8 loops=1)
|
||||||
Filter: ((worker_hash(a) >= '-2147483648'::integer) AND (worker_hash(a) <= '-1073741825'::integer))
|
Filter: ((worker_hash(a) >= '-2147483648'::integer) AND (worker_hash(a) <= '-1073741825'::integer))
|
||||||
-> Distributed Subplan XXX_2
|
-> Distributed Subplan XXX_2
|
||||||
|
Intermediate Data Size: 220 bytes
|
||||||
|
Result destination: Write locally
|
||||||
-> Custom Scan (Citus Adaptive) (actual rows=10 loops=1)
|
-> Custom Scan (Citus Adaptive) (actual rows=10 loops=1)
|
||||||
Task Count: 1
|
Task Count: 1
|
||||||
Tasks Shown: All
|
Tasks Shown: All
|
||||||
|
|
|
@ -203,6 +203,10 @@ EXPLAIN (COSTS OFF) SELECT * FROM distributed_table WHERE key = 1 AND age = 20;
|
||||||
|
|
||||||
EXPLAIN (ANALYZE, COSTS OFF, SUMMARY OFF, TIMING OFF) SELECT * FROM distributed_table WHERE key = 1 AND age = 20;
|
EXPLAIN (ANALYZE, COSTS OFF, SUMMARY OFF, TIMING OFF) SELECT * FROM distributed_table WHERE key = 1 AND age = 20;
|
||||||
|
|
||||||
|
EXPLAIN (ANALYZE ON, COSTS OFF, SUMMARY OFF, TIMING OFF)
|
||||||
|
WITH r AS ( SELECT random() z,* FROM distributed_table)
|
||||||
|
SELECT * FROM r WHERE z < 2;
|
||||||
|
|
||||||
EXPLAIN (COSTS OFF) DELETE FROM distributed_table WHERE key = 1 AND age = 20;
|
EXPLAIN (COSTS OFF) DELETE FROM distributed_table WHERE key = 1 AND age = 20;
|
||||||
|
|
||||||
EXPLAIN (ANALYZE, COSTS OFF, SUMMARY OFF, TIMING OFF) DELETE FROM distributed_table WHERE key = 1 AND age = 20;
|
EXPLAIN (ANALYZE, COSTS OFF, SUMMARY OFF, TIMING OFF) DELETE FROM distributed_table WHERE key = 1 AND age = 20;
|
||||||
|
|
Loading…
Reference in New Issue