From bb5c4941047d6bf11266778cde51a65f3b705fd0 Mon Sep 17 00:00:00 2001 From: Jelte Fennema Date: Wed, 1 Sep 2021 12:44:51 +0200 Subject: [PATCH] Enable binary encoding by default on PG14 Since PG14 we can now use binary encoding for arrays and composite types that contain user defined types. This was fixed in this commit in Postgres: https://github.com/postgres/postgres/commit/670c0a1d474bf296dbcc1d6de912d4841f2ed643 This change starts using that knowledge, by not necessarily falling back to text encoding anymore for those types. While doing this and testing a bit more I found various cases where binary encoding would fail that our checks didn't cover. This fixes those cases and adds tests for those. It also fixes EXPLAIN ANALYZE never using binary encoding, which was a leftover of workaround that was not necessary anymore. Finally, it changes the default for both `citus.enable_binary_protocol` and `citus.binary_worker_copy_format` to `true` for PG14 and up. In our cloud offering `binary_worker_copy_format` already was true by default. `enable_binary_protocol` had some bug with MX and user defined types, this bug was fixed by the above mentioned fixes. --- src/backend/distributed/commands/multi_copy.c | 83 +++++++- .../distributed/executor/adaptive_executor.c | 12 +- src/backend/distributed/shared_library_init.c | 8 + .../worker/worker_partition_protocol.c | 4 + src/test/regress/expected/binary_protocol.out | 181 ++++++++++++++---- .../regress/expected/intermediate_results.out | 2 +- .../expected/limit_intermediate_size.out | 4 + .../expected/local_shard_execution.out | 14 +- src/test/regress/expected/multi_explain.out | 178 ++++++++--------- .../multi_mx_function_call_delegation.out | 2 - .../multi_mx_function_call_delegation_0.out | 14 +- src/test/regress/expected/pg13.out | 6 +- src/test/regress/expected/single_node.out | 6 +- .../expected/subquery_complex_target_list.out | 9 +- ...rker_disable_binary_worker_copy_format.out | 20 ++ src/test/regress/pg_regress_multi.pl | 2 + src/test/regress/sql/binary_protocol.sql | 54 ++++-- src/test/regress/sql/intermediate_results.sql | 2 +- .../regress/sql/limit_intermediate_size.sql | 5 + .../regress/sql/local_shard_execution.sql | 2 + src/test/regress/sql/multi_explain.sql | 3 + .../sql/multi_mx_function_call_delegation.sql | 4 - src/test/regress/sql/pg13.sql | 3 + src/test/regress/sql/single_node.sql | 3 + .../sql/subquery_complex_target_list.sql | 9 + ...rker_disable_binary_worker_copy_format.sql | 8 + src/test/regress/worker_schedule | 1 + 27 files changed, 459 insertions(+), 180 deletions(-) create mode 100644 src/test/regress/expected/worker_disable_binary_worker_copy_format.out create mode 100644 src/test/regress/sql/worker_disable_binary_worker_copy_format.sql diff --git a/src/backend/distributed/commands/multi_copy.c b/src/backend/distributed/commands/multi_copy.c index 993934d3d..eb1bf4d31 100644 --- a/src/backend/distributed/commands/multi_copy.c +++ b/src/backend/distributed/commands/multi_copy.c @@ -102,6 +102,7 @@ #include "libpq/pqformat.h" #include "nodes/makefuncs.h" #include "nodes/nodeFuncs.h" +#include "parser/parse_type.h" #if PG_VERSION_NUM >= PG_VERSION_13 #include "tcop/cmdtag.h" #endif @@ -998,9 +999,8 @@ CanUseBinaryCopyFormatForTargetList(List *targetEntryList) /* * CanUseBinaryCopyFormatForType determines whether it is safe to use the - * binary copy format for the given type. The binary copy format cannot - * be used for arrays or composite types that contain user-defined types, - * or when there is no binary output function defined. + * binary copy format for the given type. See contents of the function for + * details of when it's safe to use binary copy. */ bool CanUseBinaryCopyFormatForType(Oid typeId) @@ -1015,14 +1015,79 @@ CanUseBinaryCopyFormatForType(Oid typeId) return false; } - if (typeId >= FirstNormalObjectId) + /* + * A row type can contain any types, possibly types that don't have + * the binary input and output functions defined. + */ + if (type_is_rowtype(typeId)) { - char typeCategory = '\0'; - bool typePreferred = false; + /* + * TODO: Inspect the types inside the record and check if all of them + * can be binary encoded. If so, it's safe to use binary encoding. + * + * IMPORTANT: When implementing this todo keep the following in mind: + * + * In PG versions before PG14 the record_recv function would error out + * more than necessary. + * + * It errors out when any of the columns in the row have a type oid + * that doesn't match with the oid in the received data. This happens + * pretty much always for non built in types, because their oids differ + * between postgres intallations. So for those Postgres versions we + * would need a check like the following for each column: + * + * if (columnType >= FirstNormalObjectId) { + * return false + * } + */ + return false; + } - get_type_category_preferred(typeId, &typeCategory, &typePreferred); - if (typeCategory == TYPCATEGORY_ARRAY || - typeCategory == TYPCATEGORY_COMPOSITE) + HeapTuple typeTup = typeidType(typeId); + Form_pg_type type = (Form_pg_type) GETSTRUCT(typeTup); + Oid elementType = type->typelem; +#if PG_VERSION_NUM < PG_VERSION_14 + char typeCategory = type->typcategory; +#endif + ReleaseSysCache(typeTup); + +#if PG_VERSION_NUM < PG_VERSION_14 + + /* + * In PG versions before PG14 the array_recv function would error out more + * than necessary. + * + * It errors out when the element type its oids don't match with the oid in + * the received data. This happens pretty much always for non built in + * types, because their oids differ between postgres intallations. So we + * skip binary encoding when the element type is a non built in type. + */ + if (typeCategory == TYPCATEGORY_ARRAY && elementType >= FirstNormalObjectId) + { + return false; + } +#endif + + /* + * Any type that is a wrapper around an element type (e.g. arrays and + * ranges) require the element type to also has support for binary + * encoding. + */ + if (elementType != InvalidOid) + { + if (!CanUseBinaryCopyFormatForType(elementType)) + { + return false; + } + } + + /* + * For domains, make sure that the underlying type can be binary copied. + */ + Oid baseTypeId = getBaseType(typeId); + if (typeId != baseTypeId) + { + if (!CanUseBinaryCopyFormatForType(baseTypeId)) { return false; } diff --git a/src/backend/distributed/executor/adaptive_executor.c b/src/backend/distributed/executor/adaptive_executor.c index ed4ad6b07..21c1b4e1e 100644 --- a/src/backend/distributed/executor/adaptive_executor.c +++ b/src/backend/distributed/executor/adaptive_executor.c @@ -476,7 +476,11 @@ struct TaskPlacementExecution; /* GUC, determining whether Citus opens 1 connection per task */ bool ForceMaxQueryParallelization = false; int MaxAdaptiveExecutorPoolSize = 16; +#if PG_VERSION_NUM >= PG_VERSION_14 +bool EnableBinaryProtocol = true; +#else bool EnableBinaryProtocol = false; +#endif /* GUC, number of ms to wait between opening connections to the same worker */ int ExecutorSlowStartInterval = 10; @@ -2023,13 +2027,7 @@ SetAttributeInputMetadata(DistributedExecution *execution, { attInMetadata = NULL; } - /* - * We only allow binary results when queryCount is 1, because we - * cannot use binary results with SendRemoteCommand. Which must be - * used if queryCount is larger than 1. - */ - else if (EnableBinaryProtocol && queryCount == 1 && - CanUseBinaryCopyFormat(tupleDescriptor)) + else if (EnableBinaryProtocol && CanUseBinaryCopyFormat(tupleDescriptor)) { attInMetadata = TupleDescGetAttBinaryInMetadata(tupleDescriptor); shardCommandExecution->binaryResults = true; diff --git a/src/backend/distributed/shared_library_init.c b/src/backend/distributed/shared_library_init.c index e8e6335e5..c37b8a626 100644 --- a/src/backend/distributed/shared_library_init.c +++ b/src/backend/distributed/shared_library_init.c @@ -575,7 +575,11 @@ RegisterCitusConfigVariables(void) "in PostgreSQL's binary serialization format when " "joining large tables."), &BinaryWorkerCopyFormat, +#if PG_VERSION_NUM >= PG_VERSION_14 + true, +#else false, +#endif PGC_SIGHUP, GUC_STANDARD, NULL, NULL, NULL); @@ -742,7 +746,11 @@ RegisterCitusConfigVariables(void) "Enables communication between nodes using binary protocol when possible"), NULL, &EnableBinaryProtocol, +#if PG_VERSION_NUM >= PG_VERSION_14 + true, +#else false, +#endif PGC_USERSET, GUC_STANDARD, NULL, NULL, NULL); diff --git a/src/backend/distributed/worker/worker_partition_protocol.c b/src/backend/distributed/worker/worker_partition_protocol.c index 7dc6c75ee..6f6a07f04 100644 --- a/src/backend/distributed/worker/worker_partition_protocol.c +++ b/src/backend/distributed/worker/worker_partition_protocol.c @@ -49,7 +49,11 @@ /* Config variables managed via guc.c */ +#if PG_VERSION_NUM >= PG_VERSION_14 +bool BinaryWorkerCopyFormat = true; /* binary format for copying between workers */ +#else bool BinaryWorkerCopyFormat = false; /* binary format for copying between workers */ +#endif int PartitionBufferSize = 16384; /* total partitioning buffer size in KB */ /* Local variables */ diff --git a/src/test/regress/expected/binary_protocol.out b/src/test/regress/expected/binary_protocol.out index fe68a4d39..607f3d89d 100644 --- a/src/test/regress/expected/binary_protocol.out +++ b/src/test/regress/expected/binary_protocol.out @@ -1,7 +1,7 @@ SET citus.shard_count = 2; SET citus.next_shard_id TO 4754000; CREATE SCHEMA binary_protocol; -SET search_path TO binary_protocol; +SET search_path TO binary_protocol, public; SET citus.enable_binary_protocol = TRUE; CREATE TABLE t(id int); SELECT create_distributed_table('t', 'id'); @@ -48,8 +48,6 @@ SELECT id, id, id, id, id, 10 | 10 | 10 | 10 | 10 | 10 | 10 | 10 | 10 | 10 | 10 | 10 | 10 | 10 | 10 | 10 | 10 | 10 | 10 | 10 | 10 | 10 | 10 | 10 | 10 | 10 | 10 | 10 | 10 | 10 (10 rows) --- EXPLAIN ANALYZE is currently forced to use text protocol. Once that is --- changed the numbers reported should change. EXPLAIN (ANALYZE TRUE, TIMING FALSE, COSTS FALSE, SUMMARY FALSE) SELECT id FROM t ORDER BY 1; QUERY PLAN --------------------------------------------------------------------- @@ -58,10 +56,10 @@ EXPLAIN (ANALYZE TRUE, TIMING FALSE, COSTS FALSE, SUMMARY FALSE) SELECT id FROM Sort Method: quicksort Memory: 25kB -> Custom Scan (Citus Adaptive) (actual rows=10 loops=1) Task Count: 2 - Tuple data received from nodes: 11 bytes + Tuple data received from nodes: 40 bytes Tasks Shown: One of 2 -> Task - Tuple data received from node: 8 bytes + Tuple data received from node: 28 bytes Node: host=localhost port=xxxxx dbname=regression -> Seq Scan on t_4754000 t (actual rows=7 loops=1) (11 rows) @@ -75,14 +73,14 @@ EXPLAIN (ANALYZE TRUE, TIMING FALSE, COSTS FALSE, SUMMARY FALSE) SELECT id FROM Sort Method: quicksort Memory: 25kB -> Custom Scan (Citus Adaptive) (actual rows=10 loops=1) Task Count: 2 - Tuple data received from nodes: 11 bytes + Tuple data received from nodes: 40 bytes Tasks Shown: All -> Task - Tuple data received from node: 8 bytes + Tuple data received from node: 28 bytes Node: host=localhost port=xxxxx dbname=regression -> Seq Scan on t_4754000 t (actual rows=7 loops=1) -> Task - Tuple data received from node: 3 bytes + Tuple data received from node: 12 bytes Node: host=localhost port=xxxxx dbname=regression -> Seq Scan on t_4754001 t (actual rows=3 loops=1) (15 rows) @@ -123,7 +121,7 @@ CREATE TYPE composite_type AS ( CREATE TABLE composite_type_table ( id bigserial, - col composite_type[] + col composite_type ); SELECT create_distributed_table('composite_type_table', 'id'); create_distributed_table @@ -131,33 +129,81 @@ SELECT create_distributed_table('composite_type_table', 'id'); (1 row) -INSERT INTO composite_type_table(col) VALUES (ARRAY[(1, 2)::composite_type]); -SELECT * FROM composite_type_table; - id | col ---------------------------------------------------------------------- - 1 | {"(1,2)"} -(1 row) - CREATE TYPE nested_composite_type AS ( a composite_type, b composite_type ); -CREATE TABLE nested_composite_type_table -( - id bigserial, - col nested_composite_type -); -SELECT create_distributed_table('nested_composite_type_table', 'id'); - create_distributed_table +select run_command_on_master_and_workers($$CREATE DOMAIN binary_protocol.composite_type_domain AS binary_protocol.composite_type$$); + run_command_on_master_and_workers --------------------------------------------------------------------- (1 row) -INSERT INTO nested_composite_type_table(col) VALUES (((1, 2), (3,4))::nested_composite_type); -SELECT * FROM nested_composite_type_table; - id | col +select run_command_on_master_and_workers($$CREATE DOMAIN binary_protocol.nested_composite_type_domain AS binary_protocol.nested_composite_type$$); + run_command_on_master_and_workers --------------------------------------------------------------------- - 1 | ("(1,2)","(3,4)") + +(1 row) + +INSERT INTO composite_type_table(col) VALUES ((1, 2)::composite_type); +SELECT col FROM composite_type_table; + col +--------------------------------------------------------------------- + (1,2) +(1 row) + +SELECT col::composite_type_domain FROM composite_type_table; + col +--------------------------------------------------------------------- + (1,2) +(1 row) + +SELECT (col, col) FROM composite_type_table; + row +--------------------------------------------------------------------- + ("(1,2)","(1,2)") +(1 row) + +SELECT (col, col)::nested_composite_type FROM composite_type_table; + row +--------------------------------------------------------------------- + ("(1,2)","(1,2)") +(1 row) + +SELECT (col, col)::nested_composite_type_domain FROM composite_type_table; + row +--------------------------------------------------------------------- + ("(1,2)","(1,2)") +(1 row) + +SELECT ARRAY[col] FROM composite_type_table; + array +--------------------------------------------------------------------- + {"(1,2)"} +(1 row) + +SELECT ARRAY[col::composite_type_domain] FROM composite_type_table; + array +--------------------------------------------------------------------- + {"(1,2)"} +(1 row) + +SELECT ARRAY[(col, col)] FROM composite_type_table; + array +--------------------------------------------------------------------- + {"(\"(1,2)\",\"(1,2)\")"} +(1 row) + +SELECT ARRAY[(col, col)::nested_composite_type] FROM composite_type_table; + array +--------------------------------------------------------------------- + {"(\"(1,2)\",\"(1,2)\")"} +(1 row) + +SELECT ARRAY[(col, col)::nested_composite_type_domain] FROM composite_type_table; + array +--------------------------------------------------------------------- + {"(\"(1,2)\",\"(1,2)\")"} (1 row) CREATE TABLE binaryless_builtin ( @@ -170,11 +216,81 @@ SELECT create_reference_table('binaryless_builtin'); (1 row) -INSERT INTO binaryless_builtin VALUES ('user postgres=r/postgres', 'test'); -SELECT * FROM binaryless_builtin; - col1 | col2 +CREATE TYPE binaryless_composite_type AS ( + a aclitem, + b aclitem +); +select run_command_on_master_and_workers($$CREATE DOMAIN binary_protocol.binaryless_domain AS aclitem$$); + run_command_on_master_and_workers --------------------------------------------------------------------- - postgres=r/postgres | test + +(1 row) + +select run_command_on_master_and_workers($$CREATE DOMAIN binary_protocol.binaryless_composite_domain AS binary_protocol.binaryless_composite_type$$); + run_command_on_master_and_workers +--------------------------------------------------------------------- + +(1 row) + +INSERT INTO binaryless_builtin VALUES ('user postgres=r/postgres', 'test'); +SELECT col1 FROM binaryless_builtin; + col1 +--------------------------------------------------------------------- + postgres=r/postgres +(1 row) + +SELECT col1::binaryless_domain FROM binaryless_builtin; + col1 +--------------------------------------------------------------------- + postgres=r/postgres +(1 row) + +SELECT (col1, col1) FROM binaryless_builtin; + row +--------------------------------------------------------------------- + (postgres=r/postgres,postgres=r/postgres) +(1 row) + +SELECT (col1, col1)::binaryless_composite_type FROM binaryless_builtin; + row +--------------------------------------------------------------------- + (postgres=r/postgres,postgres=r/postgres) +(1 row) + +SELECT (col1, col1)::binaryless_composite_domain FROM binaryless_builtin; + row +--------------------------------------------------------------------- + (postgres=r/postgres,postgres=r/postgres) +(1 row) + +SELECT ARRAY[col1] FROM binaryless_builtin; + array +--------------------------------------------------------------------- + {postgres=r/postgres} +(1 row) + +SELECT ARRAY[col1::binaryless_domain] FROM binaryless_builtin; + array +--------------------------------------------------------------------- + {postgres=r/postgres} +(1 row) + +SELECT ARRAY[(col1, col1)] FROM binaryless_builtin; + array +--------------------------------------------------------------------- + {"(postgres=r/postgres,postgres=r/postgres)"} +(1 row) + +SELECT ARRAY[(col1, col1)::binaryless_composite_type] FROM binaryless_builtin; + array +--------------------------------------------------------------------- + {"(postgres=r/postgres,postgres=r/postgres)"} +(1 row) + +SELECT ARRAY[(col1, col1)::binaryless_composite_domain] FROM binaryless_builtin; + array +--------------------------------------------------------------------- + {"(postgres=r/postgres,postgres=r/postgres)"} (1 row) CREATE TABLE test_table_1(id int, val1 int); @@ -203,6 +319,5 @@ ORDER BY 1, 2; 3 | 3 (3 rows) -\set VERBOSITY terse +SET client_min_messages TO WARNING; DROP SCHEMA binary_protocol CASCADE; -NOTICE: drop cascades to 8 other objects diff --git a/src/test/regress/expected/intermediate_results.out b/src/test/regress/expected/intermediate_results.out index 77b42ae08..c510b07b3 100644 --- a/src/test/regress/expected/intermediate_results.out +++ b/src/test/regress/expected/intermediate_results.out @@ -383,7 +383,7 @@ SELECT broadcast_intermediate_result('stored_squares_1', -- query the intermediate result in a router query using text format SELECT * FROM interesting_squares JOIN ( SELECT * FROM - read_intermediate_results(ARRAY['stored_squares_1', 'stored_squares_2'], 'binary') AS res (x int, x2 int, z intermediate_results.square_type) + read_intermediate_results(ARRAY['stored_squares_1', 'stored_squares_2'], 'text') AS res (x int, x2 int, z intermediate_results.square_type) ) squares ON (squares.x::text = interested_in) WHERE user_id = 'jon' ORDER BY 1,2; user_id | interested_in | x | x2 | z diff --git a/src/test/regress/expected/limit_intermediate_size.out b/src/test/regress/expected/limit_intermediate_size.out index d4c60d78f..662ce0e05 100644 --- a/src/test/regress/expected/limit_intermediate_size.out +++ b/src/test/regress/expected/limit_intermediate_size.out @@ -1,4 +1,8 @@ SET citus.enable_repartition_joins to ON; +-- The intermediate result limits chosen below are based on text sizes of the +-- intermediate results. This is a no-op for PG_VERSION_NUM < 14, because the +-- default is false there. +SET citus.enable_binary_protocol = FALSE; SET citus.max_intermediate_result_size TO 2; -- should fail because the copy size is ~4kB for each cte WITH cte AS MATERIALIZED diff --git a/src/test/regress/expected/local_shard_execution.out b/src/test/regress/expected/local_shard_execution.out index d248fa6de..c104d2e75 100644 --- a/src/test/regress/expected/local_shard_execution.out +++ b/src/test/regress/expected/local_shard_execution.out @@ -287,6 +287,8 @@ NOTICE: executing the command locally: INSERT INTO local_shard_execution.distri INSERT INTO distributed_table SELECT 1, '1',15 FROM distributed_table WHERE key = 2 LIMIT 1 ON CONFLICT DO NOTHING; -- sanity check: multi-shard INSERT..SELECT pushdown goes through distributed execution INSERT INTO distributed_table SELECT * FROM distributed_table ON CONFLICT DO NOTHING; +-- Ensure tuple data in explain analyze output is the same on all PG versions +SET citus.enable_binary_protocol = TRUE; -- EXPLAIN for local execution just works fine -- though going through distributed execution EXPLAIN (COSTS OFF) SELECT * FROM distributed_table WHERE key = 1 AND age = 20; @@ -307,10 +309,10 @@ EXPLAIN (ANALYZE, COSTS OFF, SUMMARY OFF, TIMING OFF) SELECT * FROM distribute --------------------------------------------------------------------- Custom Scan (Citus Adaptive) (actual rows=1 loops=1) Task Count: 1 - Tuple data received from nodes: 5 bytes + Tuple data received from nodes: 14 bytes Tasks Shown: All -> Task - Tuple data received from node: 5 bytes + Tuple data received from node: 14 bytes Node: host=localhost port=xxxxx dbname=regression -> Index Scan using distributed_table_pkey_1470001 on distributed_table_1470001 distributed_table (actual rows=1 loops=1) Index Cond: (key = 1) @@ -328,17 +330,17 @@ SELECT 1 FROM r WHERE z < 3; Result destination: Write locally -> Custom Scan (Citus Adaptive) (actual rows=1 loops=1) Task Count: 4 - Tuple data received from nodes: 6 bytes + Tuple data received from nodes: 22 bytes Tasks Shown: One of 4 -> Task - Tuple data received from node: 6 bytes + Tuple data received from node: 22 bytes Node: host=localhost port=xxxxx dbname=regression -> Seq Scan on distributed_table_1470001 distributed_table (actual rows=1 loops=1) Task Count: 1 - Tuple data received from nodes: 1 bytes + Tuple data received from nodes: 4 bytes Tasks Shown: All -> Task - Tuple data received from node: 1 bytes + Tuple data received from node: 4 bytes Node: host=localhost port=xxxxx dbname=regression -> Function Scan on read_intermediate_result intermediate_result (actual rows=1 loops=1) Filter: (z < '3'::double precision) diff --git a/src/test/regress/expected/multi_explain.out b/src/test/regress/expected/multi_explain.out index 7eb3eb0d4..140ce1445 100644 --- a/src/test/regress/expected/multi_explain.out +++ b/src/test/regress/expected/multi_explain.out @@ -5,6 +5,8 @@ SET citus.next_shard_id TO 570000; \a\t SET citus.explain_distributed_queries TO on; SET citus.enable_repartition_joins to ON; +-- Ensure tuple data in explain analyze output is the same on all PG versions +SET citus.enable_binary_protocol = TRUE; -- Function that parses explain output as JSON CREATE FUNCTION explain_json(query text) RETURNS jsonb @@ -274,10 +276,10 @@ Sort (actual rows=50 loops=1) Group Key: remote_scan.l_quantity -> Custom Scan (Citus Adaptive) (actual rows=100 loops=1) Task Count: 2 - Tuple data received from nodes: 780 bytes + Tuple data received from nodes: 1800 bytes Tasks Shown: One of 2 -> Task - Tuple data received from node: 390 bytes + Tuple data received from node: 900 bytes Node: host=localhost port=xxxxx dbname=regression -> HashAggregate (actual rows=50 loops=1) Group Key: l_quantity @@ -294,7 +296,7 @@ EXPLAIN (COSTS off, ANALYZE on, TIMING off, SUMMARY off) SELECT count(*) FROM t1 Aggregate (actual rows=1 loops=1) -> Custom Scan (Citus Adaptive) (actual rows=4 loops=1) Task Count: 4 - Tuple data received from nodes: 4 bytes + Tuple data received from nodes: 32 bytes Tasks Shown: None, not supported for re-partition queries -> MapMergeJob Map Task Count: 3 @@ -313,7 +315,7 @@ Custom Scan (Citus Adaptive) (actual rows=1 loops=1) -> Aggregate (actual rows=1 loops=1) -> Custom Scan (Citus Adaptive) (actual rows=4 loops=1) Task Count: 4 - Tuple data received from nodes: 4 bytes + Tuple data received from nodes: 32 bytes Tasks Shown: None, not supported for re-partition queries -> MapMergeJob Map Task Count: 3 @@ -322,10 +324,10 @@ Custom Scan (Citus Adaptive) (actual rows=1 loops=1) Map Task Count: 3 Merge Task Count: 4 Task Count: 1 - Tuple data received from nodes: 1 bytes + Tuple data received from nodes: 8 bytes Tasks Shown: All -> Task - Tuple data received from node: 1 bytes + Tuple data received from node: 8 bytes Node: host=localhost port=xxxxx dbname=regression -> Aggregate (actual rows=1 loops=1) -> Function Scan on read_intermediate_result intermediate_result (actual rows=1 loops=1) @@ -345,11 +347,11 @@ Sort (actual rows=50 loops=1) -> Custom Scan (Citus Adaptive) (actual rows=100 loops=1) Output: remote_scan.l_quantity, remote_scan.count_quantity Task Count: 2 - Tuple data received from nodes: 780 bytes + Tuple data received from nodes: 1800 bytes Tasks Shown: One of 2 -> Task Query: SELECT l_quantity, count(*) AS count_quantity FROM public.lineitem_290000 lineitem WHERE true GROUP BY l_quantity - Tuple data received from node: 390 bytes + Tuple data received from node: 900 bytes Node: host=localhost port=xxxxx dbname=regression -> HashAggregate (actual rows=50 loops=1) Output: l_quantity, count(*) @@ -991,16 +993,16 @@ Sort (actual rows=50 loops=1) Group Key: remote_scan.l_quantity -> Custom Scan (Citus Adaptive) (actual rows=100 loops=1) Task Count: 2 - Tuple data received from nodes: 780 bytes + Tuple data received from nodes: 1800 bytes Tasks Shown: All -> Task - Tuple data received from node: 390 bytes + Tuple data received from node: 900 bytes Node: host=localhost port=xxxxx dbname=regression -> HashAggregate (actual rows=50 loops=1) Group Key: l_quantity -> Seq Scan on lineitem_290000 lineitem (actual rows=6000 loops=1) -> Task - Tuple data received from node: 390 bytes + Tuple data received from node: 900 bytes Node: host=localhost port=xxxxx dbname=regression -> HashAggregate (actual rows=50 loops=1) Group Key: l_quantity @@ -1262,10 +1264,10 @@ Custom Scan (Citus Adaptive) (cost=0.00..0.00 rows=100000 width=18) EXPLAIN (ANALYZE ON, COSTS OFF, TIMING OFF, SUMMARY OFF) EXECUTE router_executor_query_param(5); Custom Scan (Citus Adaptive) (actual rows=3 loops=1) Task Count: 1 - Tuple data received from nodes: 15 bytes + Tuple data received from nodes: 30 bytes Tasks Shown: All -> Task - Tuple data received from node: 15 bytes + Tuple data received from node: 30 bytes Node: host=localhost port=xxxxx dbname=regression -> Index Scan using lineitem_pkey_290000 on lineitem_290000 lineitem (actual rows=3 loops=1) Index Cond: (l_orderkey = 5) @@ -1867,10 +1869,10 @@ SELECT create_distributed_table('explain_analyze_test', 'a'); EXPLAIN :default_analyze_flags SELECT * FROM explain_analyze_test WHERE a = 1; Custom Scan (Citus Adaptive) (actual rows=1 loops=1) Task Count: 1 - Tuple data received from nodes: 8 bytes + Tuple data received from nodes: 11 bytes Tasks Shown: All -> Task - Tuple data received from node: 8 bytes + Tuple data received from node: 11 bytes Node: host=localhost port=xxxxx dbname=regression -> Seq Scan on explain_analyze_test_570009 explain_analyze_test (actual rows=1 loops=1) Filter: (a = 1) @@ -1879,10 +1881,10 @@ EXPLAIN :default_analyze_flags SELECT count(*) FROM explain_analyze_test; Aggregate (actual rows=1 loops=1) -> Custom Scan (Citus Adaptive) (actual rows=4 loops=1) Task Count: 4 - Tuple data received from nodes: 4 bytes + Tuple data received from nodes: 32 bytes Tasks Shown: One of 4 -> Task - Tuple data received from node: 1 bytes + Tuple data received from node: 8 bytes Node: host=localhost port=xxxxx dbname=regression -> Aggregate (actual rows=1 loops=1) -> Seq Scan on explain_analyze_test_570009 explain_analyze_test (actual rows=1 loops=1) @@ -2248,17 +2250,17 @@ Custom Scan (Citus Adaptive) (actual rows=1 loops=1) Result destination: Send to 2 nodes -> Custom Scan (Citus Adaptive) (actual rows=10 loops=1) Task Count: 4 - Tuple data received from nodes: 21 bytes + Tuple data received from nodes: 120 bytes Tasks Shown: One of 4 -> Task - Tuple data received from node: 9 bytes + Tuple data received from node: 48 bytes Node: host=localhost port=xxxxx dbname=regression -> Seq Scan on dist_table_570017 dist_table (actual rows=4 loops=1) Task Count: 1 - Tuple data received from nodes: 2 bytes + Tuple data received from nodes: 8 bytes Tasks Shown: All -> Task - Tuple data received from node: 2 bytes + Tuple data received from node: 8 bytes Node: host=localhost port=xxxxx dbname=regression -> Aggregate (actual rows=1 loops=1) -> Hash Join (actual rows=10 loops=1) @@ -2271,10 +2273,10 @@ SELECT count(distinct a) FROM (SELECT GREATEST(random(), 2) r, a FROM dist_table Aggregate (actual rows=1 loops=1) -> Custom Scan (Citus Adaptive) (actual rows=4 loops=1) Task Count: 4 - Tuple data received from nodes: 4 bytes + Tuple data received from nodes: 32 bytes Tasks Shown: One of 4 -> Task - Tuple data received from node: 1 bytes + Tuple data received from node: 8 bytes Node: host=localhost port=xxxxx dbname=regression -> Aggregate (actual rows=1 loops=1) -> Merge Join (actual rows=4 loops=1) @@ -2314,10 +2316,10 @@ Aggregate (actual rows=1 loops=1) Sort Method: quicksort Memory: 25kB -> Seq Scan on ref_table_570021 ref_table (actual rows=10 loops=1) Task Count: 4 - Tuple data received from nodes: 4 bytes + Tuple data received from nodes: 32 bytes Tasks Shown: One of 4 -> Task - Tuple data received from node: 1 bytes + Tuple data received from node: 8 bytes Node: host=localhost port=xxxxx dbname=regression -> Aggregate (actual rows=1 loops=1) InitPlan 1 (returns $0) @@ -2340,10 +2342,10 @@ Custom Scan (Citus Adaptive) (actual rows=1 loops=1) Result destination: Write locally -> Custom Scan (Citus Adaptive) (actual rows=20 loops=1) Task Count: 4 - Tuple data received from nodes: 44 bytes + Tuple data received from nodes: 160 bytes Tasks Shown: One of 4 -> Task - Tuple data received from node: 20 bytes + Tuple data received from node: 64 bytes Node: host=localhost port=xxxxx dbname=regression -> Insert on dist_table_570017 citus_table_alias (actual rows=8 loops=1) -> Seq Scan on dist_table_570017 dist_table (actual rows=8 loops=1) @@ -2353,17 +2355,17 @@ Custom Scan (Citus Adaptive) (actual rows=1 loops=1) Result destination: Write locally -> Custom Scan (Citus Adaptive) (actual rows=10 loops=1) Task Count: 1 - Tuple data received from nodes: 28 bytes + Tuple data received from nodes: 50 bytes Tasks Shown: All -> Task - Tuple data received from node: 28 bytes + Tuple data received from node: 50 bytes Node: host=localhost port=xxxxx dbname=regression -> Function Scan on read_intermediate_result intermediate_result (actual rows=10 loops=1) Task Count: 1 - Tuple data received from nodes: 2 bytes + Tuple data received from nodes: 8 bytes Tasks Shown: All -> Task - Tuple data received from node: 2 bytes + Tuple data received from node: 8 bytes Node: host=localhost port=xxxxx dbname=regression -> Aggregate (actual rows=1 loops=1) -> Function Scan on read_intermediate_result intermediate_result (actual rows=10 loops=1) @@ -2373,10 +2375,10 @@ prepare ref_select(int) AS select * from ref_table where 1 = $1; explain :default_analyze_flags execute ref_select(1); Custom Scan (Citus Adaptive) (actual rows=10 loops=1) Task Count: 1 - Tuple data received from nodes: 11 bytes + Tuple data received from nodes: 40 bytes Tasks Shown: All -> Task - Tuple data received from node: 11 bytes + Tuple data received from node: 40 bytes Node: host=localhost port=xxxxx dbname=regression -> Result (actual rows=10 loops=1) One-Time Filter: (1 = $1) @@ -2396,104 +2398,104 @@ SELECT create_distributed_table('dist_table_rep2', 'a'); EXPLAIN :default_analyze_flags INSERT INTO dist_table_rep1 VALUES(1), (2), (3), (4), (10), (100) RETURNING *; Custom Scan (Citus Adaptive) (actual rows=6 loops=1) Task Count: 2 - Tuple data received from nodes: 9 bytes + Tuple data received from nodes: 24 bytes Tasks Shown: One of 2 -> Task - Tuple data received from node: 5 bytes + Tuple data received from node: 16 bytes Node: host=localhost port=xxxxx dbname=regression -> Insert on dist_table_rep1_570022 citus_table_alias (actual rows=4 loops=1) -> Values Scan on "*VALUES*" (actual rows=4 loops=1) EXPLAIN :default_analyze_flags SELECT * from dist_table_rep1; Custom Scan (Citus Adaptive) (actual rows=6 loops=1) Task Count: 2 - Tuple data received from nodes: 9 bytes + Tuple data received from nodes: 24 bytes Tasks Shown: One of 2 -> Task - Tuple data received from node: 5 bytes + Tuple data received from node: 16 bytes Node: host=localhost port=xxxxx dbname=regression -> Seq Scan on dist_table_rep1_570022 dist_table_rep1 (actual rows=4 loops=1) EXPLAIN :default_analyze_flags INSERT INTO dist_table_rep2 VALUES(1), (2), (3), (4), (10), (100) RETURNING *; Custom Scan (Citus Adaptive) (actual rows=6 loops=1) Task Count: 2 - Tuple data received from nodes: 18 bytes + Tuple data received from nodes: 48 bytes Tasks Shown: One of 2 -> Task - Tuple data received from node: 10 bytes + Tuple data received from node: 32 bytes Node: host=localhost port=xxxxx dbname=regression -> Insert on dist_table_rep2_570024 citus_table_alias (actual rows=4 loops=1) -> Values Scan on "*VALUES*" (actual rows=4 loops=1) EXPLAIN :default_analyze_flags SELECT * from dist_table_rep2; Custom Scan (Citus Adaptive) (actual rows=6 loops=1) Task Count: 2 - Tuple data received from nodes: 9 bytes + Tuple data received from nodes: 24 bytes Tasks Shown: One of 2 -> Task - Tuple data received from node: 5 bytes + Tuple data received from node: 16 bytes Node: host=localhost port=xxxxx dbname=regression -> Seq Scan on dist_table_rep2_570024 dist_table_rep2 (actual rows=4 loops=1) prepare p1 as SELECT * FROM dist_table_rep1; EXPLAIN :default_analyze_flags EXECUTE p1; Custom Scan (Citus Adaptive) (actual rows=6 loops=1) Task Count: 2 - Tuple data received from nodes: 9 bytes + Tuple data received from nodes: 24 bytes Tasks Shown: One of 2 -> Task - Tuple data received from node: 5 bytes + Tuple data received from node: 16 bytes Node: host=localhost port=xxxxx dbname=regression -> Seq Scan on dist_table_rep1_570022 dist_table_rep1 (actual rows=4 loops=1) EXPLAIN :default_analyze_flags EXECUTE p1; Custom Scan (Citus Adaptive) (actual rows=6 loops=1) Task Count: 2 - Tuple data received from nodes: 9 bytes + Tuple data received from nodes: 24 bytes Tasks Shown: One of 2 -> Task - Tuple data received from node: 5 bytes + Tuple data received from node: 16 bytes Node: host=localhost port=xxxxx dbname=regression -> Seq Scan on dist_table_rep1_570022 dist_table_rep1 (actual rows=4 loops=1) EXPLAIN :default_analyze_flags EXECUTE p1; Custom Scan (Citus Adaptive) (actual rows=6 loops=1) Task Count: 2 - Tuple data received from nodes: 9 bytes + Tuple data received from nodes: 24 bytes Tasks Shown: One of 2 -> Task - Tuple data received from node: 5 bytes + Tuple data received from node: 16 bytes Node: host=localhost port=xxxxx dbname=regression -> Seq Scan on dist_table_rep1_570022 dist_table_rep1 (actual rows=4 loops=1) EXPLAIN :default_analyze_flags EXECUTE p1; Custom Scan (Citus Adaptive) (actual rows=6 loops=1) Task Count: 2 - Tuple data received from nodes: 9 bytes + Tuple data received from nodes: 24 bytes Tasks Shown: One of 2 -> Task - Tuple data received from node: 5 bytes + Tuple data received from node: 16 bytes Node: host=localhost port=xxxxx dbname=regression -> Seq Scan on dist_table_rep1_570022 dist_table_rep1 (actual rows=4 loops=1) EXPLAIN :default_analyze_flags EXECUTE p1; Custom Scan (Citus Adaptive) (actual rows=6 loops=1) Task Count: 2 - Tuple data received from nodes: 9 bytes + Tuple data received from nodes: 24 bytes Tasks Shown: One of 2 -> Task - Tuple data received from node: 5 bytes + Tuple data received from node: 16 bytes Node: host=localhost port=xxxxx dbname=regression -> Seq Scan on dist_table_rep1_570022 dist_table_rep1 (actual rows=4 loops=1) EXPLAIN :default_analyze_flags EXECUTE p1; Custom Scan (Citus Adaptive) (actual rows=6 loops=1) Task Count: 2 - Tuple data received from nodes: 9 bytes + Tuple data received from nodes: 24 bytes Tasks Shown: One of 2 -> Task - Tuple data received from node: 5 bytes + Tuple data received from node: 16 bytes Node: host=localhost port=xxxxx dbname=regression -> Seq Scan on dist_table_rep1_570022 dist_table_rep1 (actual rows=4 loops=1) prepare p2 AS SELECT * FROM dist_table_rep1 WHERE a = $1; EXPLAIN :default_analyze_flags EXECUTE p2(1); Custom Scan (Citus Adaptive) (actual rows=1 loops=1) Task Count: 1 - Tuple data received from nodes: 1 bytes + Tuple data received from nodes: 4 bytes Tasks Shown: All -> Task - Tuple data received from node: 1 bytes + Tuple data received from node: 4 bytes Node: host=localhost port=xxxxx dbname=regression -> Seq Scan on dist_table_rep1_570022 dist_table_rep1 (actual rows=1 loops=1) Filter: (a = 1) @@ -2501,10 +2503,10 @@ Custom Scan (Citus Adaptive) (actual rows=1 loops=1) EXPLAIN :default_analyze_flags EXECUTE p2(1); Custom Scan (Citus Adaptive) (actual rows=1 loops=1) Task Count: 1 - Tuple data received from nodes: 1 bytes + Tuple data received from nodes: 4 bytes Tasks Shown: All -> Task - Tuple data received from node: 1 bytes + Tuple data received from node: 4 bytes Node: host=localhost port=xxxxx dbname=regression -> Seq Scan on dist_table_rep1_570022 dist_table_rep1 (actual rows=1 loops=1) Filter: (a = 1) @@ -2512,10 +2514,10 @@ Custom Scan (Citus Adaptive) (actual rows=1 loops=1) EXPLAIN :default_analyze_flags EXECUTE p2(1); Custom Scan (Citus Adaptive) (actual rows=1 loops=1) Task Count: 1 - Tuple data received from nodes: 1 bytes + Tuple data received from nodes: 4 bytes Tasks Shown: All -> Task - Tuple data received from node: 1 bytes + Tuple data received from node: 4 bytes Node: host=localhost port=xxxxx dbname=regression -> Seq Scan on dist_table_rep1_570022 dist_table_rep1 (actual rows=1 loops=1) Filter: (a = 1) @@ -2523,10 +2525,10 @@ Custom Scan (Citus Adaptive) (actual rows=1 loops=1) EXPLAIN :default_analyze_flags EXECUTE p2(1); Custom Scan (Citus Adaptive) (actual rows=1 loops=1) Task Count: 1 - Tuple data received from nodes: 1 bytes + Tuple data received from nodes: 4 bytes Tasks Shown: All -> Task - Tuple data received from node: 1 bytes + Tuple data received from node: 4 bytes Node: host=localhost port=xxxxx dbname=regression -> Seq Scan on dist_table_rep1_570022 dist_table_rep1 (actual rows=1 loops=1) Filter: (a = 1) @@ -2534,10 +2536,10 @@ Custom Scan (Citus Adaptive) (actual rows=1 loops=1) EXPLAIN :default_analyze_flags EXECUTE p2(1); Custom Scan (Citus Adaptive) (actual rows=1 loops=1) Task Count: 1 - Tuple data received from nodes: 1 bytes + Tuple data received from nodes: 4 bytes Tasks Shown: All -> Task - Tuple data received from node: 1 bytes + Tuple data received from node: 4 bytes Node: host=localhost port=xxxxx dbname=regression -> Seq Scan on dist_table_rep1_570022 dist_table_rep1 (actual rows=1 loops=1) Filter: (a = 1) @@ -2545,10 +2547,10 @@ Custom Scan (Citus Adaptive) (actual rows=1 loops=1) EXPLAIN :default_analyze_flags EXECUTE p2(1); Custom Scan (Citus Adaptive) (actual rows=1 loops=1) Task Count: 1 - Tuple data received from nodes: 1 bytes + Tuple data received from nodes: 4 bytes Tasks Shown: All -> Task - Tuple data received from node: 1 bytes + Tuple data received from node: 4 bytes Node: host=localhost port=xxxxx dbname=regression -> Seq Scan on dist_table_rep1_570022 dist_table_rep1 (actual rows=1 loops=1) Filter: (a = 1) @@ -2556,10 +2558,10 @@ Custom Scan (Citus Adaptive) (actual rows=1 loops=1) EXPLAIN :default_analyze_flags EXECUTE p2(10); Custom Scan (Citus Adaptive) (actual rows=1 loops=1) Task Count: 1 - Tuple data received from nodes: 2 bytes + Tuple data received from nodes: 4 bytes Tasks Shown: All -> Task - Tuple data received from node: 2 bytes + Tuple data received from node: 4 bytes Node: host=localhost port=xxxxx dbname=regression -> Seq Scan on dist_table_rep1_570022 dist_table_rep1 (actual rows=1 loops=1) Filter: (a = 10) @@ -2567,10 +2569,10 @@ Custom Scan (Citus Adaptive) (actual rows=1 loops=1) EXPLAIN :default_analyze_flags EXECUTE p2(100); Custom Scan (Citus Adaptive) (actual rows=1 loops=1) Task Count: 1 - Tuple data received from nodes: 3 bytes + Tuple data received from nodes: 4 bytes Tasks Shown: All -> Task - Tuple data received from node: 3 bytes + Tuple data received from node: 4 bytes Node: host=localhost port=xxxxx dbname=regression -> Seq Scan on dist_table_rep1_570023 dist_table_rep1 (actual rows=1 loops=1) Filter: (a = 100) @@ -2579,10 +2581,10 @@ prepare p3 AS SELECT * FROM dist_table_rep1 WHERE a = 1; EXPLAIN :default_analyze_flags EXECUTE p3; Custom Scan (Citus Adaptive) (actual rows=1 loops=1) Task Count: 1 - Tuple data received from nodes: 1 bytes + Tuple data received from nodes: 4 bytes Tasks Shown: All -> Task - Tuple data received from node: 1 bytes + Tuple data received from node: 4 bytes Node: host=localhost port=xxxxx dbname=regression -> Seq Scan on dist_table_rep1_570022 dist_table_rep1 (actual rows=1 loops=1) Filter: (a = 1) @@ -2590,10 +2592,10 @@ Custom Scan (Citus Adaptive) (actual rows=1 loops=1) EXPLAIN :default_analyze_flags EXECUTE p3; Custom Scan (Citus Adaptive) (actual rows=1 loops=1) Task Count: 1 - Tuple data received from nodes: 1 bytes + Tuple data received from nodes: 4 bytes Tasks Shown: All -> Task - Tuple data received from node: 1 bytes + Tuple data received from node: 4 bytes Node: host=localhost port=xxxxx dbname=regression -> Seq Scan on dist_table_rep1_570022 dist_table_rep1 (actual rows=1 loops=1) Filter: (a = 1) @@ -2601,10 +2603,10 @@ Custom Scan (Citus Adaptive) (actual rows=1 loops=1) EXPLAIN :default_analyze_flags EXECUTE p3; Custom Scan (Citus Adaptive) (actual rows=1 loops=1) Task Count: 1 - Tuple data received from nodes: 1 bytes + Tuple data received from nodes: 4 bytes Tasks Shown: All -> Task - Tuple data received from node: 1 bytes + Tuple data received from node: 4 bytes Node: host=localhost port=xxxxx dbname=regression -> Seq Scan on dist_table_rep1_570022 dist_table_rep1 (actual rows=1 loops=1) Filter: (a = 1) @@ -2612,10 +2614,10 @@ Custom Scan (Citus Adaptive) (actual rows=1 loops=1) EXPLAIN :default_analyze_flags EXECUTE p3; Custom Scan (Citus Adaptive) (actual rows=1 loops=1) Task Count: 1 - Tuple data received from nodes: 1 bytes + Tuple data received from nodes: 4 bytes Tasks Shown: All -> Task - Tuple data received from node: 1 bytes + Tuple data received from node: 4 bytes Node: host=localhost port=xxxxx dbname=regression -> Seq Scan on dist_table_rep1_570022 dist_table_rep1 (actual rows=1 loops=1) Filter: (a = 1) @@ -2623,10 +2625,10 @@ Custom Scan (Citus Adaptive) (actual rows=1 loops=1) EXPLAIN :default_analyze_flags EXECUTE p3; Custom Scan (Citus Adaptive) (actual rows=1 loops=1) Task Count: 1 - Tuple data received from nodes: 1 bytes + Tuple data received from nodes: 4 bytes Tasks Shown: All -> Task - Tuple data received from node: 1 bytes + Tuple data received from node: 4 bytes Node: host=localhost port=xxxxx dbname=regression -> Seq Scan on dist_table_rep1_570022 dist_table_rep1 (actual rows=1 loops=1) Filter: (a = 1) @@ -2634,10 +2636,10 @@ Custom Scan (Citus Adaptive) (actual rows=1 loops=1) EXPLAIN :default_analyze_flags EXECUTE p3; Custom Scan (Citus Adaptive) (actual rows=1 loops=1) Task Count: 1 - Tuple data received from nodes: 1 bytes + Tuple data received from nodes: 4 bytes Tasks Shown: All -> Task - Tuple data received from node: 1 bytes + Tuple data received from node: 4 bytes Node: host=localhost port=xxxxx dbname=regression -> Seq Scan on dist_table_rep1_570022 dist_table_rep1 (actual rows=1 loops=1) Filter: (a = 1) @@ -2860,10 +2862,10 @@ Custom Scan (Citus Adaptive) (actual rows=10 loops=1) Filter: (l_quantity < '-1'::numeric) Rows Removed by Filter: 2885 Task Count: 1 - Tuple data received from nodes: 11 bytes + Tuple data received from nodes: 40 bytes Tasks Shown: All -> Task - Tuple data received from node: 11 bytes + Tuple data received from node: 40 bytes Node: host=localhost port=xxxxx dbname=regression -> Function Scan on generate_series s (actual rows=10 loops=1) ROLLBACK; @@ -2920,7 +2922,7 @@ set citus.explain_analyze_sort_method to "taskId"; EXPLAIN (COSTS FALSE, ANALYZE TRUE, TIMING FALSE, SUMMARY FALSE) select a, CASE WHEN pg_sleep(0.4) IS NULL THEN 'x' END from explain_analyze_execution_time; Custom Scan (Citus Adaptive) (actual rows=1 loops=1) Task Count: 2 - Tuple data received from nodes: 1 bytes + Tuple data received from nodes: 4 bytes Tasks Shown: One of 2 -> Task Tuple data received from node: 0 bytes @@ -2930,10 +2932,10 @@ set citus.explain_analyze_sort_method to "execution-time"; EXPLAIN (COSTS FALSE, ANALYZE TRUE, TIMING FALSE, SUMMARY FALSE) select a, CASE WHEN pg_sleep(0.4) IS NULL THEN 'x' END from explain_analyze_execution_time; Custom Scan (Citus Adaptive) (actual rows=1 loops=1) Task Count: 2 - Tuple data received from nodes: 1 bytes + Tuple data received from nodes: 4 bytes Tasks Shown: One of 2 -> Task - Tuple data received from node: 1 bytes + Tuple data received from node: 4 bytes Node: host=localhost port=xxxxx dbname=regression -> Seq Scan on explain_analyze_execution_time_570030 explain_analyze_execution_time (actual rows=1 loops=1) -- reset back @@ -2987,10 +2989,10 @@ Limit (actual rows=1 loops=1) Node: host=localhost port=xxxxx dbname=regression -> Seq Scan on distributed_table_1_570032 distributed_table_xxx (actual rows=1 loops=1) Task Count: 2 - Tuple data received from nodes: 3 bytes + Tuple data received from nodes: 16 bytes Tasks Shown: One of 2 -> Task - Tuple data received from node: 3 bytes + Tuple data received from node: 16 bytes Node: host=localhost port=xxxxx dbname=regression -> Limit (actual rows=1 loops=1) -> Nested Loop (actual rows=1 loops=1) diff --git a/src/test/regress/expected/multi_mx_function_call_delegation.out b/src/test/regress/expected/multi_mx_function_call_delegation.out index 309df4664..fe4e41efa 100644 --- a/src/test/regress/expected/multi_mx_function_call_delegation.out +++ b/src/test/regress/expected/multi_mx_function_call_delegation.out @@ -230,8 +230,6 @@ DEBUG: pushing down the function call (S,S) (1 row) --- this is fixed with pg14 and this will fail prior to --- pg 14 SET citus.enable_binary_protocol = TRUE; select mx_call_func_custom_types('S', 'A'); DEBUG: pushing down the function call diff --git a/src/test/regress/expected/multi_mx_function_call_delegation_0.out b/src/test/regress/expected/multi_mx_function_call_delegation_0.out index 835dd5263..034f8f5e8 100644 --- a/src/test/regress/expected/multi_mx_function_call_delegation_0.out +++ b/src/test/regress/expected/multi_mx_function_call_delegation_0.out @@ -230,15 +230,21 @@ DEBUG: pushing down the function call (S,S) (1 row) --- this is fixed with pg14 and this will fail prior to --- pg 14 SET citus.enable_binary_protocol = TRUE; select mx_call_func_custom_types('S', 'A'); DEBUG: pushing down the function call -ERROR: wrong data type: XXXX, expected XXXX + mx_call_func_custom_types +--------------------------------------------------------------------- + (S,S) +(1 row) + select multi_mx_function_call_delegation.mx_call_func_custom_types('S', 'A'); DEBUG: pushing down the function call -ERROR: wrong data type: XXXX, expected XXXX + mx_call_func_custom_types +--------------------------------------------------------------------- + (S,S) +(1 row) + RESET citus.enable_binary_protocol; -- We don't allow distributing calls inside transactions begin; diff --git a/src/test/regress/expected/pg13.out b/src/test/regress/expected/pg13.out index 7b730ed64..01a93afe0 100644 --- a/src/test/regress/expected/pg13.out +++ b/src/test/regress/expected/pg13.out @@ -10,6 +10,8 @@ set search_path to test_pg13; SET citus.shard_replication_factor to 1; SET citus.shard_count to 2; SET citus.next_shard_id TO 65000; +-- Ensure tuple data in explain analyze output is the same on all PG versions +SET citus.enable_binary_protocol = TRUE; CREATE TABLE dist_table (name char, age int); CREATE INDEX name_index on dist_table(name); SELECT create_distributed_table('dist_table', 'name'); @@ -217,10 +219,10 @@ INSERT INTO test_wal VALUES(3,33),(4,44),(5,55) RETURNING *; --------------------------------------------------------------------- Custom Scan (Citus Adaptive) (actual rows=3 loops=1) Task Count: 1 - Tuple data received from nodes: 9 bytes + Tuple data received from nodes: 24 bytes Tasks Shown: All -> Task - Tuple data received from node: 9 bytes + Tuple data received from node: 24 bytes Node: host=localhost port=xxxxx dbname=regression -> Insert on test_wal_65012 citus_table_alias (actual rows=3 loops=1) WAL: records=3 bytes=189 diff --git a/src/test/regress/expected/single_node.out b/src/test/regress/expected/single_node.out index 6bf5af799..d1573522c 100644 --- a/src/test/regress/expected/single_node.out +++ b/src/test/regress/expected/single_node.out @@ -3,6 +3,8 @@ SET search_path TO single_node; SET citus.shard_count TO 4; SET citus.shard_replication_factor TO 1; SET citus.next_shard_id TO 90630500; +-- Ensure tuple data in explain analyze output is the same on all PG versions +SET citus.enable_binary_protocol = TRUE; -- adding the coordinator as inactive is disallowed SELECT 1 FROM master_add_inactive_node('localhost', :master_port, groupid => 0); ERROR: coordinator node cannot be added as inactive node @@ -580,10 +582,10 @@ EXPLAIN (COSTS FALSE, ANALYZE TRUE, TIMING FALSE, SUMMARY FALSE) --------------------------------------------------------------------- Custom Scan (Citus Adaptive) (actual rows=5 loops=1) Task Count: 4 - Tuple data received from nodes: 10 bytes + Tuple data received from nodes: 40 bytes Tasks Shown: One of 4 -> Task - Tuple data received from node: 4 bytes + Tuple data received from node: 16 bytes Node: host=localhost port=xxxxx dbname=regression -> Seq Scan on test_90630506 test (actual rows=2 loops=1) (8 rows) diff --git a/src/test/regress/expected/subquery_complex_target_list.out b/src/test/regress/expected/subquery_complex_target_list.out index 8ac4fac2e..934f89224 100644 --- a/src/test/regress/expected/subquery_complex_target_list.out +++ b/src/test/regress/expected/subquery_complex_target_list.out @@ -129,6 +129,10 @@ DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT foo."?column? -- we reset the client min_messages here to avoid adding an alternative output -- for pg14 as the output slightly differs. RESET client_min_messages; +-- Set binary protocol temporarily to true always, to get consistent float +-- output across PG versions. +-- This is a no-op for PG_VERSION_NUM >= 14 +SET citus.enable_binary_protocol = TRUE; -- Expressions inside the aggregates -- parts of the query is inspired by TPCH queries SELECT @@ -172,10 +176,13 @@ FROM ORDER BY 1 DESC; avg | cnt_1 | cnt_2 | cnt_3 | sum_1 | l_year | pos | count_pay --------------------------------------------------------------------- - 30.14666771571734992301 | 3308.14619815793 | 2.5000000000000000 | | 31 | 2017 | 0 | 1 + 30.14666771571734992301 | 3308.14619815794 | 2.5000000000000000 | | 31 | 2017 | 0 | 1 (1 row) SET client_min_messages TO DEBUG1; +-- Reset binary protocol back after temporaribly changing it +-- This is a no-op for PG_VERSION_NUM >= 14 +RESET citus.enable_binary_protocol; -- Multiple columns in GROUP BYs -- foo needs to be recursively planned, bar can be pushded down SELECT diff --git a/src/test/regress/expected/worker_disable_binary_worker_copy_format.out b/src/test/regress/expected/worker_disable_binary_worker_copy_format.out new file mode 100644 index 000000000..1653912fc --- /dev/null +++ b/src/test/regress/expected/worker_disable_binary_worker_copy_format.out @@ -0,0 +1,20 @@ +-- The files we use in the following text use the text based worker copy +-- format. So we disable the binary worker copy format here. +-- This is a no-op for PG_VERSION_NUM < 14, because the default is off there. +ALTER SYSTEM SET citus.binary_worker_copy_format TO off; +SELECT pg_reload_conf(); + pg_reload_conf +--------------------------------------------------------------------- + t +(1 row) + +SELECT success FROM run_command_on_workers('ALTER SYSTEM SET citus.binary_worker_copy_format TO off'); + success +--------------------------------------------------------------------- +(0 rows) + +SELECT success FROM run_command_on_workers('SELECT pg_reload_conf()'); + success +--------------------------------------------------------------------- +(0 rows) + diff --git a/src/test/regress/pg_regress_multi.pl b/src/test/regress/pg_regress_multi.pl index 5fd945320..4c21e7ffe 100755 --- a/src/test/regress/pg_regress_multi.pl +++ b/src/test/regress/pg_regress_multi.pl @@ -468,6 +468,8 @@ if ($useMitmproxy) { # make tests reproducible by never trying to negotiate ssl push(@pgOptions, "citus.node_conninfo='sslmode=disable'"); + # The commands that we intercept are based on the the text based protocol. + push(@pgOptions, "citus.enable_binary_protocol='false'"); } elsif ($followercluster) { diff --git a/src/test/regress/sql/binary_protocol.sql b/src/test/regress/sql/binary_protocol.sql index 4fb2a0cac..2e721d33a 100644 --- a/src/test/regress/sql/binary_protocol.sql +++ b/src/test/regress/sql/binary_protocol.sql @@ -1,7 +1,7 @@ SET citus.shard_count = 2; SET citus.next_shard_id TO 4754000; CREATE SCHEMA binary_protocol; -SET search_path TO binary_protocol; +SET search_path TO binary_protocol, public; SET citus.enable_binary_protocol = TRUE; CREATE TABLE t(id int); @@ -19,8 +19,6 @@ SELECT id, id, id, id, id, id, id, id, id, id FROM t ORDER BY id; --- EXPLAIN ANALYZE is currently forced to use text protocol. Once that is --- changed the numbers reported should change. EXPLAIN (ANALYZE TRUE, TIMING FALSE, COSTS FALSE, SUMMARY FALSE) SELECT id FROM t ORDER BY 1; SET citus.explain_all_tasks TO ON; EXPLAIN (ANALYZE TRUE, TIMING FALSE, COSTS FALSE, SUMMARY FALSE) SELECT id FROM t ORDER BY 1; @@ -39,30 +37,30 @@ CREATE TYPE composite_type AS ( CREATE TABLE composite_type_table ( id bigserial, - col composite_type[] + col composite_type ); - SELECT create_distributed_table('composite_type_table', 'id'); -INSERT INTO composite_type_table(col) VALUES (ARRAY[(1, 2)::composite_type]); - -SELECT * FROM composite_type_table; CREATE TYPE nested_composite_type AS ( a composite_type, b composite_type ); +select run_command_on_master_and_workers($$CREATE DOMAIN binary_protocol.composite_type_domain AS binary_protocol.composite_type$$); +select run_command_on_master_and_workers($$CREATE DOMAIN binary_protocol.nested_composite_type_domain AS binary_protocol.nested_composite_type$$); -CREATE TABLE nested_composite_type_table -( - id bigserial, - col nested_composite_type -); -SELECT create_distributed_table('nested_composite_type_table', 'id'); -INSERT INTO nested_composite_type_table(col) VALUES (((1, 2), (3,4))::nested_composite_type); - -SELECT * FROM nested_composite_type_table; +INSERT INTO composite_type_table(col) VALUES ((1, 2)::composite_type); +SELECT col FROM composite_type_table; +SELECT col::composite_type_domain FROM composite_type_table; +SELECT (col, col) FROM composite_type_table; +SELECT (col, col)::nested_composite_type FROM composite_type_table; +SELECT (col, col)::nested_composite_type_domain FROM composite_type_table; +SELECT ARRAY[col] FROM composite_type_table; +SELECT ARRAY[col::composite_type_domain] FROM composite_type_table; +SELECT ARRAY[(col, col)] FROM composite_type_table; +SELECT ARRAY[(col, col)::nested_composite_type] FROM composite_type_table; +SELECT ARRAY[(col, col)::nested_composite_type_domain] FROM composite_type_table; CREATE TABLE binaryless_builtin ( @@ -71,8 +69,25 @@ col2 character varying(255) NOT NULL ); SELECT create_reference_table('binaryless_builtin'); +CREATE TYPE binaryless_composite_type AS ( + a aclitem, + b aclitem +); + +select run_command_on_master_and_workers($$CREATE DOMAIN binary_protocol.binaryless_domain AS aclitem$$); +select run_command_on_master_and_workers($$CREATE DOMAIN binary_protocol.binaryless_composite_domain AS binary_protocol.binaryless_composite_type$$); + INSERT INTO binaryless_builtin VALUES ('user postgres=r/postgres', 'test'); -SELECT * FROM binaryless_builtin; +SELECT col1 FROM binaryless_builtin; +SELECT col1::binaryless_domain FROM binaryless_builtin; +SELECT (col1, col1) FROM binaryless_builtin; +SELECT (col1, col1)::binaryless_composite_type FROM binaryless_builtin; +SELECT (col1, col1)::binaryless_composite_domain FROM binaryless_builtin; +SELECT ARRAY[col1] FROM binaryless_builtin; +SELECT ARRAY[col1::binaryless_domain] FROM binaryless_builtin; +SELECT ARRAY[(col1, col1)] FROM binaryless_builtin; +SELECT ARRAY[(col1, col1)::binaryless_composite_type] FROM binaryless_builtin; +SELECT ARRAY[(col1, col1)::binaryless_composite_domain] FROM binaryless_builtin; CREATE TABLE test_table_1(id int, val1 int); CREATE TABLE test_table_2(id int, val1 bigint); @@ -85,6 +100,5 @@ SELECT id, val1 FROM test_table_1 LEFT JOIN test_table_2 USING(id, val1) ORDER BY 1, 2; -\set VERBOSITY terse +SET client_min_messages TO WARNING; DROP SCHEMA binary_protocol CASCADE; - diff --git a/src/test/regress/sql/intermediate_results.sql b/src/test/regress/sql/intermediate_results.sql index b260f5ea2..ae5402345 100644 --- a/src/test/regress/sql/intermediate_results.sql +++ b/src/test/regress/sql/intermediate_results.sql @@ -187,7 +187,7 @@ SELECT broadcast_intermediate_result('stored_squares_1', -- query the intermediate result in a router query using text format SELECT * FROM interesting_squares JOIN ( SELECT * FROM - read_intermediate_results(ARRAY['stored_squares_1', 'stored_squares_2'], 'binary') AS res (x int, x2 int, z intermediate_results.square_type) + read_intermediate_results(ARRAY['stored_squares_1', 'stored_squares_2'], 'text') AS res (x int, x2 int, z intermediate_results.square_type) ) squares ON (squares.x::text = interested_in) WHERE user_id = 'jon' ORDER BY 1,2; diff --git a/src/test/regress/sql/limit_intermediate_size.sql b/src/test/regress/sql/limit_intermediate_size.sql index 420923f84..5b013bf75 100644 --- a/src/test/regress/sql/limit_intermediate_size.sql +++ b/src/test/regress/sql/limit_intermediate_size.sql @@ -1,5 +1,10 @@ SET citus.enable_repartition_joins to ON; +-- The intermediate result limits chosen below are based on text sizes of the +-- intermediate results. This is a no-op for PG_VERSION_NUM < 14, because the +-- default is false there. +SET citus.enable_binary_protocol = FALSE; + SET citus.max_intermediate_result_size TO 2; -- should fail because the copy size is ~4kB for each cte WITH cte AS MATERIALIZED diff --git a/src/test/regress/sql/local_shard_execution.sql b/src/test/regress/sql/local_shard_execution.sql index 77974a9a2..b38e7b2f1 100644 --- a/src/test/regress/sql/local_shard_execution.sql +++ b/src/test/regress/sql/local_shard_execution.sql @@ -204,6 +204,8 @@ INSERT INTO distributed_table SELECT 1, '1',15 FROM distributed_table WHERE key -- sanity check: multi-shard INSERT..SELECT pushdown goes through distributed execution INSERT INTO distributed_table SELECT * FROM distributed_table ON CONFLICT DO NOTHING; +-- Ensure tuple data in explain analyze output is the same on all PG versions +SET citus.enable_binary_protocol = TRUE; -- EXPLAIN for local execution just works fine -- though going through distributed execution diff --git a/src/test/regress/sql/multi_explain.sql b/src/test/regress/sql/multi_explain.sql index d58477f72..d2a89d3ee 100644 --- a/src/test/regress/sql/multi_explain.sql +++ b/src/test/regress/sql/multi_explain.sql @@ -9,6 +9,9 @@ SET citus.next_shard_id TO 570000; SET citus.explain_distributed_queries TO on; SET citus.enable_repartition_joins to ON; +-- Ensure tuple data in explain analyze output is the same on all PG versions +SET citus.enable_binary_protocol = TRUE; + -- Function that parses explain output as JSON CREATE FUNCTION explain_json(query text) RETURNS jsonb diff --git a/src/test/regress/sql/multi_mx_function_call_delegation.sql b/src/test/regress/sql/multi_mx_function_call_delegation.sql index 513146a4e..46d91ba39 100644 --- a/src/test/regress/sql/multi_mx_function_call_delegation.sql +++ b/src/test/regress/sql/multi_mx_function_call_delegation.sql @@ -102,15 +102,11 @@ select squares(4); select multi_mx_function_call_delegation.mx_call_func(2, 0); select multi_mx_function_call_delegation.mx_call_func_custom_types('S', 'A'); - --- this is fixed with pg14 and this will fail prior to --- pg 14 SET citus.enable_binary_protocol = TRUE; select mx_call_func_custom_types('S', 'A'); select multi_mx_function_call_delegation.mx_call_func_custom_types('S', 'A'); RESET citus.enable_binary_protocol; - -- We don't allow distributing calls inside transactions begin; select mx_call_func(2, 0); diff --git a/src/test/regress/sql/pg13.sql b/src/test/regress/sql/pg13.sql index 4145acf77..8c1e1b771 100644 --- a/src/test/regress/sql/pg13.sql +++ b/src/test/regress/sql/pg13.sql @@ -14,6 +14,9 @@ SET citus.shard_replication_factor to 1; SET citus.shard_count to 2; SET citus.next_shard_id TO 65000; +-- Ensure tuple data in explain analyze output is the same on all PG versions +SET citus.enable_binary_protocol = TRUE; + CREATE TABLE dist_table (name char, age int); CREATE INDEX name_index on dist_table(name); diff --git a/src/test/regress/sql/single_node.sql b/src/test/regress/sql/single_node.sql index 228039d34..2ee0d456b 100644 --- a/src/test/regress/sql/single_node.sql +++ b/src/test/regress/sql/single_node.sql @@ -4,6 +4,9 @@ SET citus.shard_count TO 4; SET citus.shard_replication_factor TO 1; SET citus.next_shard_id TO 90630500; +-- Ensure tuple data in explain analyze output is the same on all PG versions +SET citus.enable_binary_protocol = TRUE; + -- adding the coordinator as inactive is disallowed SELECT 1 FROM master_add_inactive_node('localhost', :master_port, groupid => 0); diff --git a/src/test/regress/sql/subquery_complex_target_list.sql b/src/test/regress/sql/subquery_complex_target_list.sql index 1579ff017..932fb1b73 100644 --- a/src/test/regress/sql/subquery_complex_target_list.sql +++ b/src/test/regress/sql/subquery_complex_target_list.sql @@ -90,6 +90,12 @@ FROM -- we reset the client min_messages here to avoid adding an alternative output -- for pg14 as the output slightly differs. RESET client_min_messages; + +-- Set binary protocol temporarily to true always, to get consistent float +-- output across PG versions. +-- This is a no-op for PG_VERSION_NUM >= 14 +SET citus.enable_binary_protocol = TRUE; + -- Expressions inside the aggregates -- parts of the query is inspired by TPCH queries SELECT @@ -133,6 +139,9 @@ FROM ORDER BY 1 DESC; SET client_min_messages TO DEBUG1; +-- Reset binary protocol back after temporaribly changing it +-- This is a no-op for PG_VERSION_NUM >= 14 +RESET citus.enable_binary_protocol; -- Multiple columns in GROUP BYs -- foo needs to be recursively planned, bar can be pushded down diff --git a/src/test/regress/sql/worker_disable_binary_worker_copy_format.sql b/src/test/regress/sql/worker_disable_binary_worker_copy_format.sql new file mode 100644 index 000000000..0482c6d9d --- /dev/null +++ b/src/test/regress/sql/worker_disable_binary_worker_copy_format.sql @@ -0,0 +1,8 @@ +-- The files we use in the following text use the text based worker copy +-- format. So we disable the binary worker copy format here. +-- This is a no-op for PG_VERSION_NUM < 14, because the default is off there. +ALTER SYSTEM SET citus.binary_worker_copy_format TO off; +SELECT pg_reload_conf(); +SELECT success FROM run_command_on_workers('ALTER SYSTEM SET citus.binary_worker_copy_format TO off'); +SELECT success FROM run_command_on_workers('SELECT pg_reload_conf()'); + diff --git a/src/test/regress/worker_schedule b/src/test/regress/worker_schedule index e702be378..65cf479b7 100644 --- a/src/test/regress/worker_schedule +++ b/src/test/regress/worker_schedule @@ -14,6 +14,7 @@ test: worker_copy # ---------- # Range and hash re-partitioning related regression tests # ---------- +test: worker_disable_binary_worker_copy_format test: worker_range_partition worker_range_partition_complex test: worker_hash_partition worker_hash_partition_complex test: worker_merge_range_files worker_merge_hash_files