diff --git a/src/backend/distributed/executor/adaptive_executor.c b/src/backend/distributed/executor/adaptive_executor.c index ef6a789b6..35f7178f7 100644 --- a/src/backend/distributed/executor/adaptive_executor.c +++ b/src/backend/distributed/executor/adaptive_executor.c @@ -692,6 +692,15 @@ AdaptiveExecutor(CitusScanState *scanState) /* we should only call this once before the scan finished */ Assert(!scanState->finishedRemoteScan); + /* Reset Task fields that are only valid for a single execution */ + Task *task = NULL; + foreach_ptr(task, taskList) + { + task->totalReceivedTupleData = 0; + task->fetchedExplainAnalyzePlacementIndex = 0; + task->fetchedExplainAnalyzePlan = NULL; + } + scanState->tuplestorestate = tuplestore_begin_heap(randomAccess, interTransactions, work_mem); @@ -3687,6 +3696,8 @@ ReceiveResults(WorkerSession *session, bool storeRows) */ Assert(EnableBinaryProtocol || !binaryResults); + uint64 tupleLibpqSize = 0; + for (uint32 rowIndex = 0; rowIndex < rowsProcessed; rowIndex++) { /* @@ -3731,21 +3742,7 @@ ReceiveResults(WorkerSession *session, bool storeRows) { executionStats->totalIntermediateResultSize += valueLength; } - - /* - * We use this to count the amount of data that has been - * received for EXPLAIN ANALYZE. - * Only EXPLAIN ANALYZE TupleDestination has originalTask - * defined. So that's why we check for it, otherwise we - * don't have to keep track of this data. - * The worker plan itself is also sent as a result in the - * same task. We filter this out by only counting the data - * from the first query. - */ - if (tupleDest->originalTask && queryIndex == 0) - { - tupleDest->originalTask->totalReceivedData += valueLength; - } + tupleLibpqSize += valueLength; } } @@ -3767,7 +3764,7 @@ ReceiveResults(WorkerSession *session, bool storeRows) tupleDest->putTuple(tupleDest, task, placementExecution->placementExecutionIndex, queryIndex, - heapTuple); + heapTuple, tupleLibpqSize); MemoryContextReset(rowContext); diff --git a/src/backend/distributed/executor/tuple_destination.c b/src/backend/distributed/executor/tuple_destination.c index f4f12fbd0..a6f5420d4 100644 --- a/src/backend/distributed/executor/tuple_destination.c +++ b/src/backend/distributed/executor/tuple_destination.c @@ -42,12 +42,12 @@ typedef struct TupleDestDestReceiver /* forward declarations for local functions */ static void TupleStoreTupleDestPutTuple(TupleDestination *self, Task *task, int placementIndex, int queryNumber, - HeapTuple heapTuple); + HeapTuple heapTuple, uint64 tupleLibpqSize); static TupleDesc TupleStoreTupleDestTupleDescForQuery(TupleDestination *self, int queryNumber); static void TupleDestNonePutTuple(TupleDestination *self, Task *task, int placementIndex, int queryNumber, - HeapTuple heapTuple); + HeapTuple heapTuple, uint64 tupleLibpqSize); static TupleDesc TupleDestNoneTupleDescForQuery(TupleDestination *self, int queryNumber); static void TupleDestDestReceiverStartup(DestReceiver *copyDest, int operation, TupleDesc inputTupleDesc); @@ -83,10 +83,11 @@ CreateTupleStoreTupleDest(Tuplestorestate *tupleStore, TupleDesc tupleDescriptor static void TupleStoreTupleDestPutTuple(TupleDestination *self, Task *task, int placementIndex, int queryNumber, - HeapTuple heapTuple) + HeapTuple heapTuple, uint64 tupleLibpqSize) { TupleStoreTupleDestination *tupleDest = (TupleStoreTupleDestination *) self; tuplestore_puttuple(tupleDest->tupleStore, heapTuple); + task->totalReceivedTupleData += tupleLibpqSize; } @@ -127,7 +128,7 @@ CreateTupleDestNone(void) static void TupleDestNonePutTuple(TupleDestination *self, Task *task, int placementIndex, int queryNumber, - HeapTuple heapTuple) + HeapTuple heapTuple, uint64 tupleLibpqSize) { /* nothing to do */ } @@ -202,7 +203,7 @@ TupleDestDestReceiverReceive(TupleTableSlot *slot, HeapTuple heapTuple = ExecFetchSlotTuple(slot); #endif - tupleDest->putTuple(tupleDest, task, placementIndex, queryNumber, heapTuple); + tupleDest->putTuple(tupleDest, task, placementIndex, queryNumber, heapTuple, 0); return true; } diff --git a/src/backend/distributed/planner/multi_explain.c b/src/backend/distributed/planner/multi_explain.c index 80b4c08de..629f30f1a 100644 --- a/src/backend/distributed/planner/multi_explain.c +++ b/src/backend/distributed/planner/multi_explain.c @@ -143,7 +143,7 @@ static TupleDestination * CreateExplainAnlyzeDestination(Task *task, TupleDestination *taskDest); static void ExplainAnalyzeDestPutTuple(TupleDestination *self, Task *task, int placementIndex, int queryNumber, - HeapTuple heapTuple); + HeapTuple heapTuple, uint64 tupleLibpqSize); static TupleDesc ExplainAnalyzeDestTupleDescForQuery(TupleDestination *self, int queryNumber); static char * WrapQueryForExplainAnalyze(const char *queryString, TupleDesc tupleDesc); @@ -156,8 +156,8 @@ static void ExplainOneQuery(Query *query, int cursorOptions, QueryEnvironment *queryEnv); static double elapsed_time(instr_time *starttime); static void ExplainPropertyBytes(const char *qlabel, int64 bytes, ExplainState *es); -static uint64 TaskReceivedData(Task *task); -static bool ShowReceivedData(CitusScanState *scanState, ExplainState *es); +static uint64 TaskReceivedTupleData(Task *task); +static bool ShowReceivedTupleData(CitusScanState *scanState, ExplainState *es); /* exports for SQL callable functions */ @@ -340,11 +340,12 @@ ExplainPropertyBytes(const char *qlabel, int64 bytes, ExplainState *es) /* - * ShowReceivedData returns true if explain should show received data. This is - * only the case when using EXPLAIN ANALYZE on queries that return rows. + * ShowReceivedTupleData returns true if explain should show received data. + * This is only the case when using EXPLAIN ANALYZE on queries that return + * rows. */ static bool -ShowReceivedData(CitusScanState *scanState, ExplainState *es) +ShowReceivedTupleData(CitusScanState *scanState, ExplainState *es) { TupleDesc tupDesc = ScanStateGetTupleDescriptor(scanState); return es->analyze && tupDesc != NULL && tupDesc->natts > 0; @@ -368,16 +369,16 @@ ExplainJob(CitusScanState *scanState, Job *job, ExplainState *es) ExplainOpenGroup("Job", "Job", true, es); ExplainPropertyInteger("Task Count", NULL, taskCount, es); - if (ShowReceivedData(scanState, es)) + if (ShowReceivedTupleData(scanState, es)) { Task *task = NULL; - uint64 totalReceivedDataForAllTasks = 0; + uint64 totalReceivedTupleDataForAllTasks = 0; foreach_ptr(task, taskList) { - totalReceivedDataForAllTasks += TaskReceivedData(task); + totalReceivedTupleDataForAllTasks += TaskReceivedTupleData(task); } - ExplainPropertyBytes("Data received from workers", - totalReceivedDataForAllTasks, + ExplainPropertyBytes("Tuple data received from nodes", + totalReceivedTupleDataForAllTasks, es); } @@ -433,19 +434,19 @@ ExplainJob(CitusScanState *scanState, Job *job, ExplainState *es) /* - * TaskReceivedData returns the amount of data that was received by the + * TaskReceivedTupleData returns the amount of data that was received by the * coordinator for the task. If it's a RETURNING DML task the value stored in - * totalReceivedData is not correct yet because it only counts the bytes for + * totalReceivedTupleData is not correct yet because it only counts the bytes for * one placement. */ static uint64 -TaskReceivedData(Task *task) +TaskReceivedTupleData(Task *task) { if (task->taskType == MODIFY_TASK) { - return task->totalReceivedData * list_length(task->taskPlacementList); + return task->totalReceivedTupleData * list_length(task->taskPlacementList); } - return task->totalReceivedData; + return task->totalReceivedTupleData; } @@ -699,10 +700,10 @@ ExplainTask(CitusScanState *scanState, Task *task, int placementIndex, ExplainPropertyText("Query", queryText, es); } - if (ShowReceivedData(scanState, es)) + if (ShowReceivedTupleData(scanState, es)) { - ExplainPropertyBytes("Data received from worker", - TaskReceivedData(task), + ExplainPropertyBytes("Tuple data received from node", + TaskReceivedTupleData(task), es); } @@ -1158,7 +1159,6 @@ CreateExplainAnlyzeDestination(Task *task, TupleDestination *taskDest) tupleDestination->pub.putTuple = ExplainAnalyzeDestPutTuple; tupleDestination->pub.tupleDescForQuery = ExplainAnalyzeDestTupleDescForQuery; - tupleDestination->pub.originalTask = task; return (TupleDestination *) tupleDestination; } @@ -1171,13 +1171,15 @@ CreateExplainAnlyzeDestination(Task *task, TupleDestination *taskDest) static void ExplainAnalyzeDestPutTuple(TupleDestination *self, Task *task, int placementIndex, int queryNumber, - HeapTuple heapTuple) + HeapTuple heapTuple, uint64 tupleLibpqSize) { ExplainAnalyzeDestination *tupleDestination = (ExplainAnalyzeDestination *) self; if (queryNumber == 0) { TupleDestination *originalTupDest = tupleDestination->originalTaskDestination; - originalTupDest->putTuple(originalTupDest, task, placementIndex, 0, heapTuple); + originalTupDest->putTuple(originalTupDest, task, placementIndex, 0, heapTuple, + tupleLibpqSize); + tupleDestination->originalTask->totalReceivedTupleData += tupleLibpqSize; } else if (queryNumber == 1) { diff --git a/src/backend/distributed/utils/citus_copyfuncs.c b/src/backend/distributed/utils/citus_copyfuncs.c index 0de85253b..e2a966f27 100644 --- a/src/backend/distributed/utils/citus_copyfuncs.c +++ b/src/backend/distributed/utils/citus_copyfuncs.c @@ -329,7 +329,7 @@ CopyNodeTask(COPYFUNC_ARGS) COPY_SCALAR_FIELD(parametersInQueryStringResolved); COPY_SCALAR_FIELD(tupleDest); COPY_SCALAR_FIELD(queryCount); - COPY_SCALAR_FIELD(totalReceivedData); + COPY_SCALAR_FIELD(totalReceivedTupleData); COPY_SCALAR_FIELD(fetchedExplainAnalyzePlacementIndex); COPY_STRING_FIELD(fetchedExplainAnalyzePlan); } diff --git a/src/include/distributed/multi_physical_planner.h b/src/include/distributed/multi_physical_planner.h index 88299e192..360d5794a 100644 --- a/src/include/distributed/multi_physical_planner.h +++ b/src/include/distributed/multi_physical_planner.h @@ -338,14 +338,17 @@ typedef struct Task */ struct TupleDestination *tupleDest; + /* + * totalReceivedTupleData only counts the data for a single placement. So + * for RETURNING DML this is not really correct. This is used by + * EXPLAIN ANALYZE, to display the amount of received bytes. + */ + uint64 totalReceivedTupleData; + /* * EXPLAIN ANALYZE output fetched from worker. This is saved to be used later * by RemoteExplain(). - * - * totalReceivedData only counts the data for a single placement. So for - * RETURNING DML this is not really correct. */ - uint64 totalReceivedData; char *fetchedExplainAnalyzePlan; int fetchedExplainAnalyzePlacementIndex; } Task; diff --git a/src/include/distributed/tuple_destination.h b/src/include/distributed/tuple_destination.h index d57cdc3b7..24e4b4aad 100644 --- a/src/include/distributed/tuple_destination.h +++ b/src/include/distributed/tuple_destination.h @@ -32,11 +32,10 @@ struct TupleDestination /* putTuple implements custom processing of a tuple */ void (*putTuple)(TupleDestination *self, Task *task, int placementIndex, int queryNumber, - HeapTuple tuple); + HeapTuple tuple, uint64 tupleLibpqSize); /* tupleDescForQuery returns tuple descriptor for a query number. Can return NULL. */ TupleDesc (*tupleDescForQuery)(TupleDestination *self, int queryNumber); - Task *originalTask; }; extern TupleDestination * CreateTupleStoreTupleDest(Tuplestorestate *tupleStore, TupleDesc diff --git a/src/test/regress/expected/binary_protocol.out b/src/test/regress/expected/binary_protocol.out index e12de5f07..fe68a4d39 100644 --- a/src/test/regress/expected/binary_protocol.out +++ b/src/test/regress/expected/binary_protocol.out @@ -1,4 +1,4 @@ -SET citus.shard_count = 4; +SET citus.shard_count = 2; SET citus.next_shard_id TO 4754000; CREATE SCHEMA binary_protocol; SET search_path TO binary_protocol; @@ -48,6 +48,45 @@ SELECT id, id, id, id, id, 10 | 10 | 10 | 10 | 10 | 10 | 10 | 10 | 10 | 10 | 10 | 10 | 10 | 10 | 10 | 10 | 10 | 10 | 10 | 10 | 10 | 10 | 10 | 10 | 10 | 10 | 10 | 10 | 10 | 10 (10 rows) +-- EXPLAIN ANALYZE is currently forced to use text protocol. Once that is +-- changed the numbers reported should change. +EXPLAIN (ANALYZE TRUE, TIMING FALSE, COSTS FALSE, SUMMARY FALSE) SELECT id FROM t ORDER BY 1; + QUERY PLAN +--------------------------------------------------------------------- + Sort (actual rows=10 loops=1) + Sort Key: remote_scan.id + Sort Method: quicksort Memory: 25kB + -> Custom Scan (Citus Adaptive) (actual rows=10 loops=1) + Task Count: 2 + Tuple data received from nodes: 11 bytes + Tasks Shown: One of 2 + -> Task + Tuple data received from node: 8 bytes + Node: host=localhost port=xxxxx dbname=regression + -> Seq Scan on t_4754000 t (actual rows=7 loops=1) +(11 rows) + +SET citus.explain_all_tasks TO ON; +EXPLAIN (ANALYZE TRUE, TIMING FALSE, COSTS FALSE, SUMMARY FALSE) SELECT id FROM t ORDER BY 1; + QUERY PLAN +--------------------------------------------------------------------- + Sort (actual rows=10 loops=1) + Sort Key: remote_scan.id + Sort Method: quicksort Memory: 25kB + -> Custom Scan (Citus Adaptive) (actual rows=10 loops=1) + Task Count: 2 + Tuple data received from nodes: 11 bytes + Tasks Shown: All + -> Task + Tuple data received from node: 8 bytes + Node: host=localhost port=xxxxx dbname=regression + -> Seq Scan on t_4754000 t (actual rows=7 loops=1) + -> Task + Tuple data received from node: 3 bytes + Node: host=localhost port=xxxxx dbname=regression + -> Seq Scan on t_4754001 t (actual rows=3 loops=1) +(15 rows) + INSERT INTO t SELECT count(*) from t; INSERT INTO t (SELECT id+1 from t); SELECT * FROM t ORDER BY id; diff --git a/src/test/regress/expected/local_shard_execution.out b/src/test/regress/expected/local_shard_execution.out index 21da46798..b4cd9f860 100644 --- a/src/test/regress/expected/local_shard_execution.out +++ b/src/test/regress/expected/local_shard_execution.out @@ -296,10 +296,10 @@ EXPLAIN (ANALYZE, COSTS OFF, SUMMARY OFF, TIMING OFF) SELECT * FROM distribute --------------------------------------------------------------------- Custom Scan (Citus Adaptive) (actual rows=1 loops=1) Task Count: 1 - Data received from workers: 5 bytes + Tuple data received from nodes: 5 bytes Tasks Shown: All -> Task - Data received from worker: 5 bytes + Tuple data received from node: 5 bytes Node: host=localhost port=xxxxx dbname=regression -> Index Scan using distributed_table_pkey_1470001 on distributed_table_1470001 distributed_table (actual rows=1 loops=1) Index Cond: (key = 1) @@ -307,8 +307,8 @@ EXPLAIN (ANALYZE, COSTS OFF, SUMMARY OFF, TIMING OFF) SELECT * FROM distribute (10 rows) EXPLAIN (ANALYZE ON, COSTS OFF, SUMMARY OFF, TIMING OFF) -WITH r AS ( SELECT random() z,* FROM distributed_table) -SELECT 1 FROM r WHERE z < 2; +WITH r AS ( SELECT GREATEST(random(), 2) z,* FROM distributed_table) +SELECT 1 FROM r WHERE z < 3; QUERY PLAN --------------------------------------------------------------------- Custom Scan (Citus Adaptive) (actual rows=1 loops=1) @@ -317,20 +317,20 @@ SELECT 1 FROM r WHERE z < 2; Result destination: Write locally -> Custom Scan (Citus Adaptive) (actual rows=1 loops=1) Task Count: 4 - Data received from workers: 22 bytes + Tuple data received from nodes: 6 bytes Tasks Shown: One of 4 -> Task - Data received from worker: 22 bytes + Tuple data received from node: 6 bytes Node: host=localhost port=xxxxx dbname=regression -> Seq Scan on distributed_table_1470001 distributed_table (actual rows=1 loops=1) Task Count: 1 - Data received from workers: 1 bytes + Tuple data received from nodes: 1 bytes Tasks Shown: All -> Task - Data received from worker: 1 bytes + Tuple data received from node: 1 bytes Node: host=localhost port=xxxxx dbname=regression -> Function Scan on read_intermediate_result intermediate_result (actual rows=1 loops=1) - Filter: (z < '2'::double precision) + Filter: (z < '3'::double precision) (20 rows) EXPLAIN (COSTS OFF) DELETE FROM distributed_table WHERE key = 1 AND age = 20; diff --git a/src/test/regress/expected/multi_explain.out b/src/test/regress/expected/multi_explain.out index 47f0121a6..0c80062e8 100644 --- a/src/test/regress/expected/multi_explain.out +++ b/src/test/regress/expected/multi_explain.out @@ -283,10 +283,10 @@ Sort (actual rows=50 loops=1) Group Key: remote_scan.l_quantity -> Custom Scan (Citus Adaptive) (actual rows=100 loops=1) Task Count: 2 - Data received from workers: 780 bytes + Tuple data received from nodes: 780 bytes Tasks Shown: One of 2 -> Task - Data received from worker: 390 bytes + Tuple data received from node: 390 bytes Node: host=localhost port=xxxxx dbname=regression -> HashAggregate (actual rows=50 loops=1) Group Key: l_quantity @@ -303,7 +303,7 @@ EXPLAIN (COSTS off, ANALYZE on, TIMING off, SUMMARY off) SELECT count(*) FROM t1 Aggregate (actual rows=1 loops=1) -> Custom Scan (Citus Adaptive) (actual rows=4 loops=1) Task Count: 4 - Data received from workers: 4 bytes + Tuple data received from nodes: 4 bytes Tasks Shown: None, not supported for re-partition queries -> MapMergeJob Map Task Count: 3 @@ -327,11 +327,11 @@ Sort (actual rows=50 loops=1) -> Custom Scan (Citus Adaptive) (actual rows=100 loops=1) Output: remote_scan.l_quantity, remote_scan.count_quantity Task Count: 2 - Data received from workers: 780 bytes + Tuple data received from nodes: 780 bytes Tasks Shown: One of 2 -> Task Query: SELECT l_quantity, count(*) AS count_quantity FROM lineitem_290000 lineitem WHERE true GROUP BY l_quantity - Data received from worker: 390 bytes + Tuple data received from node: 390 bytes Node: host=localhost port=xxxxx dbname=regression -> HashAggregate (actual rows=50 loops=1) Output: l_quantity, count(*) @@ -971,16 +971,16 @@ Sort (actual rows=50 loops=1) Group Key: remote_scan.l_quantity -> Custom Scan (Citus Adaptive) (actual rows=100 loops=1) Task Count: 2 - Data received from workers: 780 bytes + Tuple data received from nodes: 780 bytes Tasks Shown: All -> Task - Data received from worker: 390 bytes + Tuple data received from node: 390 bytes Node: host=localhost port=xxxxx dbname=regression -> HashAggregate (actual rows=50 loops=1) Group Key: l_quantity -> Seq Scan on lineitem_290000 lineitem (actual rows=6000 loops=1) -> Task - Data received from worker: 390 bytes + Tuple data received from node: 390 bytes Node: host=localhost port=xxxxx dbname=regression -> HashAggregate (actual rows=50 loops=1) Group Key: l_quantity @@ -1247,10 +1247,10 @@ Custom Scan (Citus Adaptive) (cost=0.00..0.00 rows=100000 width=18) EXPLAIN (ANALYZE ON, COSTS OFF, TIMING OFF, SUMMARY OFF) EXECUTE router_executor_query_param(5); Custom Scan (Citus Adaptive) (actual rows=3 loops=1) Task Count: 1 - Data received from workers: 0 bytes + Tuple data received from nodes: 15 bytes Tasks Shown: All -> Task - Data received from worker: 0 bytes + Tuple data received from node: 15 bytes Node: host=localhost port=xxxxx dbname=regression -> Index Scan using lineitem_pkey_290000 on lineitem_290000 lineitem (actual rows=3 loops=1) Index Cond: (l_orderkey = 5) @@ -1806,10 +1806,10 @@ SELECT create_distributed_table('explain_analyze_test', 'a'); EXPLAIN :default_analyze_flags SELECT * FROM explain_analyze_test WHERE a = 1; Custom Scan (Citus Adaptive) (actual rows=1 loops=1) Task Count: 1 - Data received from workers: 8 bytes + Tuple data received from nodes: 8 bytes Tasks Shown: All -> Task - Data received from worker: 8 bytes + Tuple data received from node: 8 bytes Node: host=localhost port=xxxxx dbname=regression -> Seq Scan on explain_analyze_test_570009 explain_analyze_test (actual rows=1 loops=1) Filter: (a = 1) @@ -1818,10 +1818,10 @@ EXPLAIN :default_analyze_flags SELECT count(*) FROM explain_analyze_test; Aggregate (actual rows=1 loops=1) -> Custom Scan (Citus Adaptive) (actual rows=4 loops=1) Task Count: 4 - Data received from workers: 4 bytes + Tuple data received from nodes: 4 bytes Tasks Shown: One of 4 -> Task - Data received from worker: 1 bytes + Tuple data received from node: 1 bytes Node: host=localhost port=xxxxx dbname=regression -> Aggregate (actual rows=1 loops=1) -> Seq Scan on explain_analyze_test_570009 explain_analyze_test (actual rows=1 loops=1) @@ -1829,10 +1829,10 @@ Aggregate (actual rows=1 loops=1) EXPLAIN :default_analyze_flags SELECT * FROM explain_analyze_test WHERE a = 10000; Custom Scan (Citus Adaptive) (actual rows=0 loops=1) Task Count: 1 - Data received from workers: 0 bytes + Tuple data received from nodes: 0 bytes Tasks Shown: All -> Task - Data received from worker: 0 bytes + Tuple data received from node: 0 bytes Node: host=localhost port=xxxxx dbname=regression -> Seq Scan on explain_analyze_test_570012 explain_analyze_test (actual rows=0 loops=1) Filter: (a = 10000) @@ -1841,10 +1841,10 @@ Custom Scan (Citus Adaptive) (actual rows=0 loops=1) EXPLAIN :default_analyze_flags SELECT * FROM explain_analyze_test WHERE b = 'does not exist'; Custom Scan (Citus Adaptive) (actual rows=0 loops=1) Task Count: 4 - Data received from workers: 0 bytes + Tuple data received from nodes: 0 bytes Tasks Shown: One of 4 -> Task - Data received from worker: 0 bytes + Tuple data received from node: 0 bytes Node: host=localhost port=xxxxx dbname=regression -> Seq Scan on explain_analyze_test_570009 explain_analyze_test (actual rows=0 loops=1) Filter: (b = 'does not exist'::text) @@ -1899,10 +1899,10 @@ ROLLBACK; EXPLAIN :default_analyze_flags UPDATE explain_analyze_test SET b = 'something' WHERE a = 10000 RETURNING *; Custom Scan (Citus Adaptive) (actual rows=0 loops=1) Task Count: 1 - Data received from workers: 0 bytes + Tuple data received from nodes: 0 bytes Tasks Shown: All -> Task - Data received from worker: 0 bytes + Tuple data received from node: 0 bytes Node: host=localhost port=xxxxx dbname=regression -> Update on explain_analyze_test_570012 explain_analyze_test (actual rows=0 loops=1) -> Seq Scan on explain_analyze_test_570012 explain_analyze_test (actual rows=0 loops=1) @@ -1912,10 +1912,10 @@ Custom Scan (Citus Adaptive) (actual rows=0 loops=1) EXPLAIN :default_analyze_flags UPDATE explain_analyze_test SET b = 'something' WHERE b = 'does not exist' RETURNING *; Custom Scan (Citus Adaptive) (actual rows=0 loops=1) Task Count: 4 - Data received from workers: 0 bytes + Tuple data received from nodes: 0 bytes Tasks Shown: One of 4 -> Task - Data received from worker: 0 bytes + Tuple data received from node: 0 bytes Node: host=localhost port=xxxxx dbname=regression -> Update on explain_analyze_test_570009 explain_analyze_test (actual rows=0 loops=1) -> Seq Scan on explain_analyze_test_570009 explain_analyze_test (actual rows=0 loops=1) @@ -2040,11 +2040,11 @@ EXPLAIN (COSTS off, ANALYZE on, TIMING off, SUMMARY off, FORMAT JSON) SELECT * F "Distributed Query": { "Job": { "Task Count": 4, - "Data received from workers": "0 bytes", + "Tuple data received from nodes": "0 bytes", "Tasks Shown": "One of 4", "Tasks": [ { - "Data received from worker": "0 bytes", + "Tuple data received from node": "0 bytes", "Node": "host=localhost port=xxxxx dbname=regression", "Remote Plan": [ [ @@ -2137,11 +2137,11 @@ EXPLAIN (COSTS off, ANALYZE on, TIMING off, SUMMARY off, FORMAT XML) SELECT * FR 4 - 0 bytes + 0 bytes One of 4 - 0 bytes + 0 bytes host=localhost port=xxxxx dbname=regression @@ -2189,17 +2189,17 @@ Custom Scan (Citus Adaptive) (actual rows=1 loops=1) Result destination: Send to 2 nodes -> Custom Scan (Citus Adaptive) (actual rows=10 loops=1) Task Count: 4 - Data received from workers: 21 bytes + Tuple data received from nodes: 21 bytes Tasks Shown: One of 4 -> Task - Data received from worker: 9 bytes + Tuple data received from node: 9 bytes Node: host=localhost port=xxxxx dbname=regression -> Seq Scan on dist_table_570017 dist_table (actual rows=4 loops=1) Task Count: 1 - Data received from workers: 2 bytes + Tuple data received from nodes: 2 bytes Tasks Shown: All -> Task - Data received from worker: 2 bytes + Tuple data received from node: 2 bytes Node: host=localhost port=xxxxx dbname=regression -> Aggregate (actual rows=1 loops=1) -> Hash Join (actual rows=10 loops=1) @@ -2213,10 +2213,10 @@ SELECT count(distinct a) FROM (SELECT GREATEST(random(), 2) r, a FROM dist_table Aggregate (actual rows=1 loops=1) -> Custom Scan (Citus Adaptive) (actual rows=10 loops=1) Task Count: 4 - Data received from workers: 11 bytes + Tuple data received from nodes: 11 bytes Tasks Shown: One of 4 -> Task - Data received from worker: 5 bytes + Tuple data received from node: 5 bytes Node: host=localhost port=xxxxx dbname=regression -> Group (actual rows=4 loops=1) Group Key: t.a @@ -2241,10 +2241,10 @@ Aggregate (actual rows=1 loops=1) Result destination: Send to 2 nodes -> Custom Scan (Citus Adaptive) (actual rows=10 loops=1) Task Count: 4 - Data received from workers: 10 bytes + Tuple data received from nodes: 10 bytes Tasks Shown: One of 4 -> Task - Data received from worker: 4 bytes + Tuple data received from node: 4 bytes Node: host=localhost port=xxxxx dbname=regression -> Merge Join (actual rows=4 loops=1) Merge Cond: (dist_table.a = ref_table.a) @@ -2257,10 +2257,10 @@ Aggregate (actual rows=1 loops=1) Sort Method: quicksort Memory: 25kB -> Seq Scan on ref_table_570021 ref_table (actual rows=10 loops=1) Task Count: 4 - Data received from workers: 11 bytes + Tuple data received from nodes: 11 bytes Tasks Shown: One of 4 -> Task - Data received from worker: 5 bytes + Tuple data received from node: 5 bytes Node: host=localhost port=xxxxx dbname=regression -> HashAggregate (actual rows=4 loops=1) Group Key: dist_table.a @@ -2284,10 +2284,10 @@ Custom Scan (Citus Adaptive) (actual rows=1 loops=1) Result destination: Write locally -> Custom Scan (Citus Adaptive) (actual rows=20 loops=1) Task Count: 4 - Data received from workers: 44 bytes + Tuple data received from nodes: 44 bytes Tasks Shown: One of 4 -> Task - Data received from worker: 20 bytes + Tuple data received from node: 20 bytes Node: host=localhost port=xxxxx dbname=regression -> Insert on dist_table_570017 citus_table_alias (actual rows=8 loops=1) -> Seq Scan on dist_table_570017 dist_table (actual rows=8 loops=1) @@ -2297,17 +2297,17 @@ Custom Scan (Citus Adaptive) (actual rows=1 loops=1) Result destination: Write locally -> Custom Scan (Citus Adaptive) (actual rows=10 loops=1) Task Count: 1 - Data received from workers: 28 bytes + Tuple data received from nodes: 28 bytes Tasks Shown: All -> Task - Data received from worker: 28 bytes + Tuple data received from node: 28 bytes Node: host=localhost port=xxxxx dbname=regression -> Function Scan on read_intermediate_result intermediate_result (actual rows=10 loops=1) Task Count: 1 - Data received from workers: 2 bytes + Tuple data received from nodes: 2 bytes Tasks Shown: All -> Task - Data received from worker: 2 bytes + Tuple data received from node: 2 bytes Node: host=localhost port=xxxxx dbname=regression -> Aggregate (actual rows=1 loops=1) -> Function Scan on read_intermediate_result intermediate_result (actual rows=10 loops=1) @@ -2323,42 +2323,253 @@ SET citus.shard_replication_factor = 2; CREATE TABLE dist_table_rep2(a int); SELECT create_distributed_table('dist_table_rep2', 'a'); -EXPLAIN :default_analyze_flags INSERT INTO dist_table_rep1 VALUES(1), (2), (3), (4) RETURNING *; -Custom Scan (Citus Adaptive) (actual rows=4 loops=1) +EXPLAIN :default_analyze_flags INSERT INTO dist_table_rep1 VALUES(1), (2), (3), (4), (10), (100) RETURNING *; +Custom Scan (Citus Adaptive) (actual rows=6 loops=1) Task Count: 2 - Data received from workers: 4 bytes + Tuple data received from nodes: 9 bytes Tasks Shown: One of 2 -> Task - Data received from worker: 3 bytes + Tuple data received from node: 5 bytes Node: host=localhost port=xxxxx dbname=regression - -> Insert on dist_table_rep1_570022 citus_table_alias (actual rows=3 loops=1) - -> Values Scan on "*VALUES*" (actual rows=3 loops=1) + -> Insert on dist_table_rep1_570022 citus_table_alias (actual rows=4 loops=1) + -> Values Scan on "*VALUES*" (actual rows=4 loops=1) EXPLAIN :default_analyze_flags SELECT * from dist_table_rep1; -Custom Scan (Citus Adaptive) (actual rows=4 loops=1) +Custom Scan (Citus Adaptive) (actual rows=6 loops=1) Task Count: 2 - Data received from workers: 4 bytes + Tuple data received from nodes: 9 bytes Tasks Shown: One of 2 -> Task - Data received from worker: 3 bytes + Tuple data received from node: 5 bytes Node: host=localhost port=xxxxx dbname=regression - -> Seq Scan on dist_table_rep1_570022 dist_table_rep1 (actual rows=3 loops=1) -EXPLAIN :default_analyze_flags INSERT INTO dist_table_rep2 VALUES(1), (2), (3), (4) RETURNING *; -Custom Scan (Citus Adaptive) (actual rows=4 loops=1) + -> Seq Scan on dist_table_rep1_570022 dist_table_rep1 (actual rows=4 loops=1) +EXPLAIN :default_analyze_flags INSERT INTO dist_table_rep2 VALUES(1), (2), (3), (4), (10), (100) RETURNING *; +Custom Scan (Citus Adaptive) (actual rows=6 loops=1) Task Count: 2 - Data received from workers: 8 bytes + Tuple data received from nodes: 18 bytes Tasks Shown: One of 2 -> Task - Data received from worker: 6 bytes + Tuple data received from node: 10 bytes Node: host=localhost port=xxxxx dbname=regression - -> Insert on dist_table_rep2_570024 citus_table_alias (actual rows=3 loops=1) - -> Values Scan on "*VALUES*" (actual rows=3 loops=1) + -> Insert on dist_table_rep2_570024 citus_table_alias (actual rows=4 loops=1) + -> Values Scan on "*VALUES*" (actual rows=4 loops=1) EXPLAIN :default_analyze_flags SELECT * from dist_table_rep2; -Custom Scan (Citus Adaptive) (actual rows=4 loops=1) +Custom Scan (Citus Adaptive) (actual rows=6 loops=1) Task Count: 2 - Data received from workers: 4 bytes + Tuple data received from nodes: 9 bytes Tasks Shown: One of 2 -> Task - Data received from worker: 3 bytes + Tuple data received from node: 5 bytes Node: host=localhost port=xxxxx dbname=regression - -> Seq Scan on dist_table_rep2_570024 dist_table_rep2 (actual rows=3 loops=1) + -> Seq Scan on dist_table_rep2_570024 dist_table_rep2 (actual rows=4 loops=1) +prepare p1 as SELECT * FROM dist_table_rep1; +EXPLAIN :default_analyze_flags EXECUTE p1; +Custom Scan (Citus Adaptive) (actual rows=6 loops=1) + Task Count: 2 + Tuple data received from nodes: 9 bytes + Tasks Shown: One of 2 + -> Task + Tuple data received from node: 5 bytes + Node: host=localhost port=xxxxx dbname=regression + -> Seq Scan on dist_table_rep1_570022 dist_table_rep1 (actual rows=4 loops=1) +EXPLAIN :default_analyze_flags EXECUTE p1; +Custom Scan (Citus Adaptive) (actual rows=6 loops=1) + Task Count: 2 + Tuple data received from nodes: 9 bytes + Tasks Shown: One of 2 + -> Task + Tuple data received from node: 5 bytes + Node: host=localhost port=xxxxx dbname=regression + -> Seq Scan on dist_table_rep1_570022 dist_table_rep1 (actual rows=4 loops=1) +EXPLAIN :default_analyze_flags EXECUTE p1; +Custom Scan (Citus Adaptive) (actual rows=6 loops=1) + Task Count: 2 + Tuple data received from nodes: 9 bytes + Tasks Shown: One of 2 + -> Task + Tuple data received from node: 5 bytes + Node: host=localhost port=xxxxx dbname=regression + -> Seq Scan on dist_table_rep1_570022 dist_table_rep1 (actual rows=4 loops=1) +EXPLAIN :default_analyze_flags EXECUTE p1; +Custom Scan (Citus Adaptive) (actual rows=6 loops=1) + Task Count: 2 + Tuple data received from nodes: 9 bytes + Tasks Shown: One of 2 + -> Task + Tuple data received from node: 5 bytes + Node: host=localhost port=xxxxx dbname=regression + -> Seq Scan on dist_table_rep1_570022 dist_table_rep1 (actual rows=4 loops=1) +EXPLAIN :default_analyze_flags EXECUTE p1; +Custom Scan (Citus Adaptive) (actual rows=6 loops=1) + Task Count: 2 + Tuple data received from nodes: 9 bytes + Tasks Shown: One of 2 + -> Task + Tuple data received from node: 5 bytes + Node: host=localhost port=xxxxx dbname=regression + -> Seq Scan on dist_table_rep1_570022 dist_table_rep1 (actual rows=4 loops=1) +EXPLAIN :default_analyze_flags EXECUTE p1; +Custom Scan (Citus Adaptive) (actual rows=6 loops=1) + Task Count: 2 + Tuple data received from nodes: 9 bytes + Tasks Shown: One of 2 + -> Task + Tuple data received from node: 5 bytes + Node: host=localhost port=xxxxx dbname=regression + -> Seq Scan on dist_table_rep1_570022 dist_table_rep1 (actual rows=4 loops=1) +prepare p2 AS SELECT * FROM dist_table_rep1 WHERE a = $1; +EXPLAIN :default_analyze_flags EXECUTE p2(1); +Custom Scan (Citus Adaptive) (actual rows=1 loops=1) + Task Count: 1 + Tuple data received from nodes: 1 bytes + Tasks Shown: All + -> Task + Tuple data received from node: 1 bytes + Node: host=localhost port=xxxxx dbname=regression + -> Seq Scan on dist_table_rep1_570022 dist_table_rep1 (actual rows=1 loops=1) + Filter: (a = 1) + Rows Removed by Filter: 3 +EXPLAIN :default_analyze_flags EXECUTE p2(1); +Custom Scan (Citus Adaptive) (actual rows=1 loops=1) + Task Count: 1 + Tuple data received from nodes: 1 bytes + Tasks Shown: All + -> Task + Tuple data received from node: 1 bytes + Node: host=localhost port=xxxxx dbname=regression + -> Seq Scan on dist_table_rep1_570022 dist_table_rep1 (actual rows=1 loops=1) + Filter: (a = 1) + Rows Removed by Filter: 3 +EXPLAIN :default_analyze_flags EXECUTE p2(1); +Custom Scan (Citus Adaptive) (actual rows=1 loops=1) + Task Count: 1 + Tuple data received from nodes: 1 bytes + Tasks Shown: All + -> Task + Tuple data received from node: 1 bytes + Node: host=localhost port=xxxxx dbname=regression + -> Seq Scan on dist_table_rep1_570022 dist_table_rep1 (actual rows=1 loops=1) + Filter: (a = 1) + Rows Removed by Filter: 3 +EXPLAIN :default_analyze_flags EXECUTE p2(1); +Custom Scan (Citus Adaptive) (actual rows=1 loops=1) + Task Count: 1 + Tuple data received from nodes: 1 bytes + Tasks Shown: All + -> Task + Tuple data received from node: 1 bytes + Node: host=localhost port=xxxxx dbname=regression + -> Seq Scan on dist_table_rep1_570022 dist_table_rep1 (actual rows=1 loops=1) + Filter: (a = 1) + Rows Removed by Filter: 3 +EXPLAIN :default_analyze_flags EXECUTE p2(1); +Custom Scan (Citus Adaptive) (actual rows=1 loops=1) + Task Count: 1 + Tuple data received from nodes: 1 bytes + Tasks Shown: All + -> Task + Tuple data received from node: 1 bytes + Node: host=localhost port=xxxxx dbname=regression + -> Seq Scan on dist_table_rep1_570022 dist_table_rep1 (actual rows=1 loops=1) + Filter: (a = 1) + Rows Removed by Filter: 3 +EXPLAIN :default_analyze_flags EXECUTE p2(1); +Custom Scan (Citus Adaptive) (actual rows=1 loops=1) + Task Count: 1 + Tuple data received from nodes: 1 bytes + Tasks Shown: All + -> Task + Tuple data received from node: 1 bytes + Node: host=localhost port=xxxxx dbname=regression + -> Seq Scan on dist_table_rep1_570022 dist_table_rep1 (actual rows=1 loops=1) + Filter: (a = 1) + Rows Removed by Filter: 3 +EXPLAIN :default_analyze_flags EXECUTE p2(10); +Custom Scan (Citus Adaptive) (actual rows=1 loops=1) + Task Count: 1 + Tuple data received from nodes: 2 bytes + Tasks Shown: All + -> Task + Tuple data received from node: 2 bytes + Node: host=localhost port=xxxxx dbname=regression + -> Seq Scan on dist_table_rep1_570022 dist_table_rep1 (actual rows=1 loops=1) + Filter: (a = 10) + Rows Removed by Filter: 3 +EXPLAIN :default_analyze_flags EXECUTE p2(100); +Custom Scan (Citus Adaptive) (actual rows=1 loops=1) + Task Count: 1 + Tuple data received from nodes: 3 bytes + Tasks Shown: All + -> Task + Tuple data received from node: 3 bytes + Node: host=localhost port=xxxxx dbname=regression + -> Seq Scan on dist_table_rep1_570023 dist_table_rep1 (actual rows=1 loops=1) + Filter: (a = 100) + Rows Removed by Filter: 1 +prepare p3 AS SELECT * FROM dist_table_rep1 WHERE a = 1; +EXPLAIN :default_analyze_flags EXECUTE p3; +Custom Scan (Citus Adaptive) (actual rows=1 loops=1) + Task Count: 1 + Tuple data received from nodes: 1 bytes + Tasks Shown: All + -> Task + Tuple data received from node: 1 bytes + Node: host=localhost port=xxxxx dbname=regression + -> Seq Scan on dist_table_rep1_570022 dist_table_rep1 (actual rows=1 loops=1) + Filter: (a = 1) + Rows Removed by Filter: 3 +EXPLAIN :default_analyze_flags EXECUTE p3; +Custom Scan (Citus Adaptive) (actual rows=1 loops=1) + Task Count: 1 + Tuple data received from nodes: 1 bytes + Tasks Shown: All + -> Task + Tuple data received from node: 1 bytes + Node: host=localhost port=xxxxx dbname=regression + -> Seq Scan on dist_table_rep1_570022 dist_table_rep1 (actual rows=1 loops=1) + Filter: (a = 1) + Rows Removed by Filter: 3 +EXPLAIN :default_analyze_flags EXECUTE p3; +Custom Scan (Citus Adaptive) (actual rows=1 loops=1) + Task Count: 1 + Tuple data received from nodes: 1 bytes + Tasks Shown: All + -> Task + Tuple data received from node: 1 bytes + Node: host=localhost port=xxxxx dbname=regression + -> Seq Scan on dist_table_rep1_570022 dist_table_rep1 (actual rows=1 loops=1) + Filter: (a = 1) + Rows Removed by Filter: 3 +EXPLAIN :default_analyze_flags EXECUTE p3; +Custom Scan (Citus Adaptive) (actual rows=1 loops=1) + Task Count: 1 + Tuple data received from nodes: 1 bytes + Tasks Shown: All + -> Task + Tuple data received from node: 1 bytes + Node: host=localhost port=xxxxx dbname=regression + -> Seq Scan on dist_table_rep1_570022 dist_table_rep1 (actual rows=1 loops=1) + Filter: (a = 1) + Rows Removed by Filter: 3 +EXPLAIN :default_analyze_flags EXECUTE p3; +Custom Scan (Citus Adaptive) (actual rows=1 loops=1) + Task Count: 1 + Tuple data received from nodes: 1 bytes + Tasks Shown: All + -> Task + Tuple data received from node: 1 bytes + Node: host=localhost port=xxxxx dbname=regression + -> Seq Scan on dist_table_rep1_570022 dist_table_rep1 (actual rows=1 loops=1) + Filter: (a = 1) + Rows Removed by Filter: 3 +EXPLAIN :default_analyze_flags EXECUTE p3; +Custom Scan (Citus Adaptive) (actual rows=1 loops=1) + Task Count: 1 + Tuple data received from nodes: 1 bytes + Tasks Shown: All + -> Task + Tuple data received from node: 1 bytes + Node: host=localhost port=xxxxx dbname=regression + -> Seq Scan on dist_table_rep1_570022 dist_table_rep1 (actual rows=1 loops=1) + Filter: (a = 1) + Rows Removed by Filter: 3 DROP TABLE dist_table_rep1, dist_table_rep2; diff --git a/src/test/regress/multi_schedule b/src/test/regress/multi_schedule index 3988f0a8b..99d1e9daf 100644 --- a/src/test/regress/multi_schedule +++ b/src/test/regress/multi_schedule @@ -106,7 +106,7 @@ test: multi_query_directory_cleanup test: multi_task_assignment_policy multi_cross_shard test: multi_utility_statements test: multi_dropped_column_aliases foreign_key_restriction_enforcement -test: multi_binary_master_copy_format +test: multi_binary_master_copy_format binary_protocol # ---------- # Parallel TPC-H tests to check our distributed execution behavior diff --git a/src/test/regress/sql/local_shard_execution.sql b/src/test/regress/sql/local_shard_execution.sql index 24f3015bf..8f01a4485 100644 --- a/src/test/regress/sql/local_shard_execution.sql +++ b/src/test/regress/sql/local_shard_execution.sql @@ -204,8 +204,8 @@ EXPLAIN (COSTS OFF) SELECT * FROM distributed_table WHERE key = 1 AND age = 20; EXPLAIN (ANALYZE, COSTS OFF, SUMMARY OFF, TIMING OFF) SELECT * FROM distributed_table WHERE key = 1 AND age = 20; EXPLAIN (ANALYZE ON, COSTS OFF, SUMMARY OFF, TIMING OFF) -WITH r AS ( SELECT random() z,* FROM distributed_table) -SELECT 1 FROM r WHERE z < 2; +WITH r AS ( SELECT GREATEST(random(), 2) z,* FROM distributed_table) +SELECT 1 FROM r WHERE z < 3; EXPLAIN (COSTS OFF) DELETE FROM distributed_table WHERE key = 1 AND age = 20; diff --git a/src/test/regress/sql/multi_explain.sql b/src/test/regress/sql/multi_explain.sql index 86b6e0fc2..3f315e524 100644 --- a/src/test/regress/sql/multi_explain.sql +++ b/src/test/regress/sql/multi_explain.sql @@ -881,10 +881,36 @@ SET citus.shard_replication_factor = 2; CREATE TABLE dist_table_rep2(a int); SELECT create_distributed_table('dist_table_rep2', 'a'); -EXPLAIN :default_analyze_flags INSERT INTO dist_table_rep1 VALUES(1), (2), (3), (4) RETURNING *; +EXPLAIN :default_analyze_flags INSERT INTO dist_table_rep1 VALUES(1), (2), (3), (4), (10), (100) RETURNING *; EXPLAIN :default_analyze_flags SELECT * from dist_table_rep1; -EXPLAIN :default_analyze_flags INSERT INTO dist_table_rep2 VALUES(1), (2), (3), (4) RETURNING *; +EXPLAIN :default_analyze_flags INSERT INTO dist_table_rep2 VALUES(1), (2), (3), (4), (10), (100) RETURNING *; EXPLAIN :default_analyze_flags SELECT * from dist_table_rep2; +prepare p1 as SELECT * FROM dist_table_rep1; +EXPLAIN :default_analyze_flags EXECUTE p1; +EXPLAIN :default_analyze_flags EXECUTE p1; +EXPLAIN :default_analyze_flags EXECUTE p1; +EXPLAIN :default_analyze_flags EXECUTE p1; +EXPLAIN :default_analyze_flags EXECUTE p1; +EXPLAIN :default_analyze_flags EXECUTE p1; + +prepare p2 AS SELECT * FROM dist_table_rep1 WHERE a = $1; +EXPLAIN :default_analyze_flags EXECUTE p2(1); +EXPLAIN :default_analyze_flags EXECUTE p2(1); +EXPLAIN :default_analyze_flags EXECUTE p2(1); +EXPLAIN :default_analyze_flags EXECUTE p2(1); +EXPLAIN :default_analyze_flags EXECUTE p2(1); +EXPLAIN :default_analyze_flags EXECUTE p2(1); +EXPLAIN :default_analyze_flags EXECUTE p2(10); +EXPLAIN :default_analyze_flags EXECUTE p2(100); + +prepare p3 AS SELECT * FROM dist_table_rep1 WHERE a = 1; +EXPLAIN :default_analyze_flags EXECUTE p3; +EXPLAIN :default_analyze_flags EXECUTE p3; +EXPLAIN :default_analyze_flags EXECUTE p3; +EXPLAIN :default_analyze_flags EXECUTE p3; +EXPLAIN :default_analyze_flags EXECUTE p3; +EXPLAIN :default_analyze_flags EXECUTE p3; + DROP TABLE dist_table_rep1, dist_table_rep2;