diff --git a/src/backend/distributed/commands/multi_copy.c b/src/backend/distributed/commands/multi_copy.c index 993934d3d..eb1bf4d31 100644 --- a/src/backend/distributed/commands/multi_copy.c +++ b/src/backend/distributed/commands/multi_copy.c @@ -102,6 +102,7 @@ #include "libpq/pqformat.h" #include "nodes/makefuncs.h" #include "nodes/nodeFuncs.h" +#include "parser/parse_type.h" #if PG_VERSION_NUM >= PG_VERSION_13 #include "tcop/cmdtag.h" #endif @@ -998,9 +999,8 @@ CanUseBinaryCopyFormatForTargetList(List *targetEntryList) /* * CanUseBinaryCopyFormatForType determines whether it is safe to use the - * binary copy format for the given type. The binary copy format cannot - * be used for arrays or composite types that contain user-defined types, - * or when there is no binary output function defined. + * binary copy format for the given type. See contents of the function for + * details of when it's safe to use binary copy. */ bool CanUseBinaryCopyFormatForType(Oid typeId) @@ -1015,14 +1015,79 @@ CanUseBinaryCopyFormatForType(Oid typeId) return false; } - if (typeId >= FirstNormalObjectId) + /* + * A row type can contain any types, possibly types that don't have + * the binary input and output functions defined. + */ + if (type_is_rowtype(typeId)) { - char typeCategory = '\0'; - bool typePreferred = false; + /* + * TODO: Inspect the types inside the record and check if all of them + * can be binary encoded. If so, it's safe to use binary encoding. + * + * IMPORTANT: When implementing this todo keep the following in mind: + * + * In PG versions before PG14 the record_recv function would error out + * more than necessary. + * + * It errors out when any of the columns in the row have a type oid + * that doesn't match with the oid in the received data. This happens + * pretty much always for non built in types, because their oids differ + * between postgres intallations. So for those Postgres versions we + * would need a check like the following for each column: + * + * if (columnType >= FirstNormalObjectId) { + * return false + * } + */ + return false; + } - get_type_category_preferred(typeId, &typeCategory, &typePreferred); - if (typeCategory == TYPCATEGORY_ARRAY || - typeCategory == TYPCATEGORY_COMPOSITE) + HeapTuple typeTup = typeidType(typeId); + Form_pg_type type = (Form_pg_type) GETSTRUCT(typeTup); + Oid elementType = type->typelem; +#if PG_VERSION_NUM < PG_VERSION_14 + char typeCategory = type->typcategory; +#endif + ReleaseSysCache(typeTup); + +#if PG_VERSION_NUM < PG_VERSION_14 + + /* + * In PG versions before PG14 the array_recv function would error out more + * than necessary. + * + * It errors out when the element type its oids don't match with the oid in + * the received data. This happens pretty much always for non built in + * types, because their oids differ between postgres intallations. So we + * skip binary encoding when the element type is a non built in type. + */ + if (typeCategory == TYPCATEGORY_ARRAY && elementType >= FirstNormalObjectId) + { + return false; + } +#endif + + /* + * Any type that is a wrapper around an element type (e.g. arrays and + * ranges) require the element type to also has support for binary + * encoding. + */ + if (elementType != InvalidOid) + { + if (!CanUseBinaryCopyFormatForType(elementType)) + { + return false; + } + } + + /* + * For domains, make sure that the underlying type can be binary copied. + */ + Oid baseTypeId = getBaseType(typeId); + if (typeId != baseTypeId) + { + if (!CanUseBinaryCopyFormatForType(baseTypeId)) { return false; } diff --git a/src/backend/distributed/executor/adaptive_executor.c b/src/backend/distributed/executor/adaptive_executor.c index ed4ad6b07..21c1b4e1e 100644 --- a/src/backend/distributed/executor/adaptive_executor.c +++ b/src/backend/distributed/executor/adaptive_executor.c @@ -476,7 +476,11 @@ struct TaskPlacementExecution; /* GUC, determining whether Citus opens 1 connection per task */ bool ForceMaxQueryParallelization = false; int MaxAdaptiveExecutorPoolSize = 16; +#if PG_VERSION_NUM >= PG_VERSION_14 +bool EnableBinaryProtocol = true; +#else bool EnableBinaryProtocol = false; +#endif /* GUC, number of ms to wait between opening connections to the same worker */ int ExecutorSlowStartInterval = 10; @@ -2023,13 +2027,7 @@ SetAttributeInputMetadata(DistributedExecution *execution, { attInMetadata = NULL; } - /* - * We only allow binary results when queryCount is 1, because we - * cannot use binary results with SendRemoteCommand. Which must be - * used if queryCount is larger than 1. - */ - else if (EnableBinaryProtocol && queryCount == 1 && - CanUseBinaryCopyFormat(tupleDescriptor)) + else if (EnableBinaryProtocol && CanUseBinaryCopyFormat(tupleDescriptor)) { attInMetadata = TupleDescGetAttBinaryInMetadata(tupleDescriptor); shardCommandExecution->binaryResults = true; diff --git a/src/backend/distributed/shared_library_init.c b/src/backend/distributed/shared_library_init.c index e8e6335e5..c37b8a626 100644 --- a/src/backend/distributed/shared_library_init.c +++ b/src/backend/distributed/shared_library_init.c @@ -575,7 +575,11 @@ RegisterCitusConfigVariables(void) "in PostgreSQL's binary serialization format when " "joining large tables."), &BinaryWorkerCopyFormat, +#if PG_VERSION_NUM >= PG_VERSION_14 + true, +#else false, +#endif PGC_SIGHUP, GUC_STANDARD, NULL, NULL, NULL); @@ -742,7 +746,11 @@ RegisterCitusConfigVariables(void) "Enables communication between nodes using binary protocol when possible"), NULL, &EnableBinaryProtocol, +#if PG_VERSION_NUM >= PG_VERSION_14 + true, +#else false, +#endif PGC_USERSET, GUC_STANDARD, NULL, NULL, NULL); diff --git a/src/backend/distributed/worker/worker_partition_protocol.c b/src/backend/distributed/worker/worker_partition_protocol.c index 7dc6c75ee..6f6a07f04 100644 --- a/src/backend/distributed/worker/worker_partition_protocol.c +++ b/src/backend/distributed/worker/worker_partition_protocol.c @@ -49,7 +49,11 @@ /* Config variables managed via guc.c */ +#if PG_VERSION_NUM >= PG_VERSION_14 +bool BinaryWorkerCopyFormat = true; /* binary format for copying between workers */ +#else bool BinaryWorkerCopyFormat = false; /* binary format for copying between workers */ +#endif int PartitionBufferSize = 16384; /* total partitioning buffer size in KB */ /* Local variables */ diff --git a/src/test/regress/expected/binary_protocol.out b/src/test/regress/expected/binary_protocol.out index fe68a4d39..607f3d89d 100644 --- a/src/test/regress/expected/binary_protocol.out +++ b/src/test/regress/expected/binary_protocol.out @@ -1,7 +1,7 @@ SET citus.shard_count = 2; SET citus.next_shard_id TO 4754000; CREATE SCHEMA binary_protocol; -SET search_path TO binary_protocol; +SET search_path TO binary_protocol, public; SET citus.enable_binary_protocol = TRUE; CREATE TABLE t(id int); SELECT create_distributed_table('t', 'id'); @@ -48,8 +48,6 @@ SELECT id, id, id, id, id, 10 | 10 | 10 | 10 | 10 | 10 | 10 | 10 | 10 | 10 | 10 | 10 | 10 | 10 | 10 | 10 | 10 | 10 | 10 | 10 | 10 | 10 | 10 | 10 | 10 | 10 | 10 | 10 | 10 | 10 (10 rows) --- EXPLAIN ANALYZE is currently forced to use text protocol. Once that is --- changed the numbers reported should change. EXPLAIN (ANALYZE TRUE, TIMING FALSE, COSTS FALSE, SUMMARY FALSE) SELECT id FROM t ORDER BY 1; QUERY PLAN --------------------------------------------------------------------- @@ -58,10 +56,10 @@ EXPLAIN (ANALYZE TRUE, TIMING FALSE, COSTS FALSE, SUMMARY FALSE) SELECT id FROM Sort Method: quicksort Memory: 25kB -> Custom Scan (Citus Adaptive) (actual rows=10 loops=1) Task Count: 2 - Tuple data received from nodes: 11 bytes + Tuple data received from nodes: 40 bytes Tasks Shown: One of 2 -> Task - Tuple data received from node: 8 bytes + Tuple data received from node: 28 bytes Node: host=localhost port=xxxxx dbname=regression -> Seq Scan on t_4754000 t (actual rows=7 loops=1) (11 rows) @@ -75,14 +73,14 @@ EXPLAIN (ANALYZE TRUE, TIMING FALSE, COSTS FALSE, SUMMARY FALSE) SELECT id FROM Sort Method: quicksort Memory: 25kB -> Custom Scan (Citus Adaptive) (actual rows=10 loops=1) Task Count: 2 - Tuple data received from nodes: 11 bytes + Tuple data received from nodes: 40 bytes Tasks Shown: All -> Task - Tuple data received from node: 8 bytes + Tuple data received from node: 28 bytes Node: host=localhost port=xxxxx dbname=regression -> Seq Scan on t_4754000 t (actual rows=7 loops=1) -> Task - Tuple data received from node: 3 bytes + Tuple data received from node: 12 bytes Node: host=localhost port=xxxxx dbname=regression -> Seq Scan on t_4754001 t (actual rows=3 loops=1) (15 rows) @@ -123,7 +121,7 @@ CREATE TYPE composite_type AS ( CREATE TABLE composite_type_table ( id bigserial, - col composite_type[] + col composite_type ); SELECT create_distributed_table('composite_type_table', 'id'); create_distributed_table @@ -131,33 +129,81 @@ SELECT create_distributed_table('composite_type_table', 'id'); (1 row) -INSERT INTO composite_type_table(col) VALUES (ARRAY[(1, 2)::composite_type]); -SELECT * FROM composite_type_table; - id | col ---------------------------------------------------------------------- - 1 | {"(1,2)"} -(1 row) - CREATE TYPE nested_composite_type AS ( a composite_type, b composite_type ); -CREATE TABLE nested_composite_type_table -( - id bigserial, - col nested_composite_type -); -SELECT create_distributed_table('nested_composite_type_table', 'id'); - create_distributed_table +select run_command_on_master_and_workers($$CREATE DOMAIN binary_protocol.composite_type_domain AS binary_protocol.composite_type$$); + run_command_on_master_and_workers --------------------------------------------------------------------- (1 row) -INSERT INTO nested_composite_type_table(col) VALUES (((1, 2), (3,4))::nested_composite_type); -SELECT * FROM nested_composite_type_table; - id | col +select run_command_on_master_and_workers($$CREATE DOMAIN binary_protocol.nested_composite_type_domain AS binary_protocol.nested_composite_type$$); + run_command_on_master_and_workers --------------------------------------------------------------------- - 1 | ("(1,2)","(3,4)") + +(1 row) + +INSERT INTO composite_type_table(col) VALUES ((1, 2)::composite_type); +SELECT col FROM composite_type_table; + col +--------------------------------------------------------------------- + (1,2) +(1 row) + +SELECT col::composite_type_domain FROM composite_type_table; + col +--------------------------------------------------------------------- + (1,2) +(1 row) + +SELECT (col, col) FROM composite_type_table; + row +--------------------------------------------------------------------- + ("(1,2)","(1,2)") +(1 row) + +SELECT (col, col)::nested_composite_type FROM composite_type_table; + row +--------------------------------------------------------------------- + ("(1,2)","(1,2)") +(1 row) + +SELECT (col, col)::nested_composite_type_domain FROM composite_type_table; + row +--------------------------------------------------------------------- + ("(1,2)","(1,2)") +(1 row) + +SELECT ARRAY[col] FROM composite_type_table; + array +--------------------------------------------------------------------- + {"(1,2)"} +(1 row) + +SELECT ARRAY[col::composite_type_domain] FROM composite_type_table; + array +--------------------------------------------------------------------- + {"(1,2)"} +(1 row) + +SELECT ARRAY[(col, col)] FROM composite_type_table; + array +--------------------------------------------------------------------- + {"(\"(1,2)\",\"(1,2)\")"} +(1 row) + +SELECT ARRAY[(col, col)::nested_composite_type] FROM composite_type_table; + array +--------------------------------------------------------------------- + {"(\"(1,2)\",\"(1,2)\")"} +(1 row) + +SELECT ARRAY[(col, col)::nested_composite_type_domain] FROM composite_type_table; + array +--------------------------------------------------------------------- + {"(\"(1,2)\",\"(1,2)\")"} (1 row) CREATE TABLE binaryless_builtin ( @@ -170,11 +216,81 @@ SELECT create_reference_table('binaryless_builtin'); (1 row) -INSERT INTO binaryless_builtin VALUES ('user postgres=r/postgres', 'test'); -SELECT * FROM binaryless_builtin; - col1 | col2 +CREATE TYPE binaryless_composite_type AS ( + a aclitem, + b aclitem +); +select run_command_on_master_and_workers($$CREATE DOMAIN binary_protocol.binaryless_domain AS aclitem$$); + run_command_on_master_and_workers --------------------------------------------------------------------- - postgres=r/postgres | test + +(1 row) + +select run_command_on_master_and_workers($$CREATE DOMAIN binary_protocol.binaryless_composite_domain AS binary_protocol.binaryless_composite_type$$); + run_command_on_master_and_workers +--------------------------------------------------------------------- + +(1 row) + +INSERT INTO binaryless_builtin VALUES ('user postgres=r/postgres', 'test'); +SELECT col1 FROM binaryless_builtin; + col1 +--------------------------------------------------------------------- + postgres=r/postgres +(1 row) + +SELECT col1::binaryless_domain FROM binaryless_builtin; + col1 +--------------------------------------------------------------------- + postgres=r/postgres +(1 row) + +SELECT (col1, col1) FROM binaryless_builtin; + row +--------------------------------------------------------------------- + (postgres=r/postgres,postgres=r/postgres) +(1 row) + +SELECT (col1, col1)::binaryless_composite_type FROM binaryless_builtin; + row +--------------------------------------------------------------------- + (postgres=r/postgres,postgres=r/postgres) +(1 row) + +SELECT (col1, col1)::binaryless_composite_domain FROM binaryless_builtin; + row +--------------------------------------------------------------------- + (postgres=r/postgres,postgres=r/postgres) +(1 row) + +SELECT ARRAY[col1] FROM binaryless_builtin; + array +--------------------------------------------------------------------- + {postgres=r/postgres} +(1 row) + +SELECT ARRAY[col1::binaryless_domain] FROM binaryless_builtin; + array +--------------------------------------------------------------------- + {postgres=r/postgres} +(1 row) + +SELECT ARRAY[(col1, col1)] FROM binaryless_builtin; + array +--------------------------------------------------------------------- + {"(postgres=r/postgres,postgres=r/postgres)"} +(1 row) + +SELECT ARRAY[(col1, col1)::binaryless_composite_type] FROM binaryless_builtin; + array +--------------------------------------------------------------------- + {"(postgres=r/postgres,postgres=r/postgres)"} +(1 row) + +SELECT ARRAY[(col1, col1)::binaryless_composite_domain] FROM binaryless_builtin; + array +--------------------------------------------------------------------- + {"(postgres=r/postgres,postgres=r/postgres)"} (1 row) CREATE TABLE test_table_1(id int, val1 int); @@ -203,6 +319,5 @@ ORDER BY 1, 2; 3 | 3 (3 rows) -\set VERBOSITY terse +SET client_min_messages TO WARNING; DROP SCHEMA binary_protocol CASCADE; -NOTICE: drop cascades to 8 other objects diff --git a/src/test/regress/expected/intermediate_results.out b/src/test/regress/expected/intermediate_results.out index 77b42ae08..c510b07b3 100644 --- a/src/test/regress/expected/intermediate_results.out +++ b/src/test/regress/expected/intermediate_results.out @@ -383,7 +383,7 @@ SELECT broadcast_intermediate_result('stored_squares_1', -- query the intermediate result in a router query using text format SELECT * FROM interesting_squares JOIN ( SELECT * FROM - read_intermediate_results(ARRAY['stored_squares_1', 'stored_squares_2'], 'binary') AS res (x int, x2 int, z intermediate_results.square_type) + read_intermediate_results(ARRAY['stored_squares_1', 'stored_squares_2'], 'text') AS res (x int, x2 int, z intermediate_results.square_type) ) squares ON (squares.x::text = interested_in) WHERE user_id = 'jon' ORDER BY 1,2; user_id | interested_in | x | x2 | z diff --git a/src/test/regress/expected/limit_intermediate_size.out b/src/test/regress/expected/limit_intermediate_size.out index d4c60d78f..662ce0e05 100644 --- a/src/test/regress/expected/limit_intermediate_size.out +++ b/src/test/regress/expected/limit_intermediate_size.out @@ -1,4 +1,8 @@ SET citus.enable_repartition_joins to ON; +-- The intermediate result limits chosen below are based on text sizes of the +-- intermediate results. This is a no-op for PG_VERSION_NUM < 14, because the +-- default is false there. +SET citus.enable_binary_protocol = FALSE; SET citus.max_intermediate_result_size TO 2; -- should fail because the copy size is ~4kB for each cte WITH cte AS MATERIALIZED diff --git a/src/test/regress/expected/local_shard_execution.out b/src/test/regress/expected/local_shard_execution.out index d248fa6de..c104d2e75 100644 --- a/src/test/regress/expected/local_shard_execution.out +++ b/src/test/regress/expected/local_shard_execution.out @@ -287,6 +287,8 @@ NOTICE: executing the command locally: INSERT INTO local_shard_execution.distri INSERT INTO distributed_table SELECT 1, '1',15 FROM distributed_table WHERE key = 2 LIMIT 1 ON CONFLICT DO NOTHING; -- sanity check: multi-shard INSERT..SELECT pushdown goes through distributed execution INSERT INTO distributed_table SELECT * FROM distributed_table ON CONFLICT DO NOTHING; +-- Ensure tuple data in explain analyze output is the same on all PG versions +SET citus.enable_binary_protocol = TRUE; -- EXPLAIN for local execution just works fine -- though going through distributed execution EXPLAIN (COSTS OFF) SELECT * FROM distributed_table WHERE key = 1 AND age = 20; @@ -307,10 +309,10 @@ EXPLAIN (ANALYZE, COSTS OFF, SUMMARY OFF, TIMING OFF) SELECT * FROM distribute --------------------------------------------------------------------- Custom Scan (Citus Adaptive) (actual rows=1 loops=1) Task Count: 1 - Tuple data received from nodes: 5 bytes + Tuple data received from nodes: 14 bytes Tasks Shown: All -> Task - Tuple data received from node: 5 bytes + Tuple data received from node: 14 bytes Node: host=localhost port=xxxxx dbname=regression -> Index Scan using distributed_table_pkey_1470001 on distributed_table_1470001 distributed_table (actual rows=1 loops=1) Index Cond: (key = 1) @@ -328,17 +330,17 @@ SELECT 1 FROM r WHERE z < 3; Result destination: Write locally -> Custom Scan (Citus Adaptive) (actual rows=1 loops=1) Task Count: 4 - Tuple data received from nodes: 6 bytes + Tuple data received from nodes: 22 bytes Tasks Shown: One of 4 -> Task - Tuple data received from node: 6 bytes + Tuple data received from node: 22 bytes Node: host=localhost port=xxxxx dbname=regression -> Seq Scan on distributed_table_1470001 distributed_table (actual rows=1 loops=1) Task Count: 1 - Tuple data received from nodes: 1 bytes + Tuple data received from nodes: 4 bytes Tasks Shown: All -> Task - Tuple data received from node: 1 bytes + Tuple data received from node: 4 bytes Node: host=localhost port=xxxxx dbname=regression -> Function Scan on read_intermediate_result intermediate_result (actual rows=1 loops=1) Filter: (z < '3'::double precision) diff --git a/src/test/regress/expected/multi_explain.out b/src/test/regress/expected/multi_explain.out index 7eb3eb0d4..140ce1445 100644 --- a/src/test/regress/expected/multi_explain.out +++ b/src/test/regress/expected/multi_explain.out @@ -5,6 +5,8 @@ SET citus.next_shard_id TO 570000; \a\t SET citus.explain_distributed_queries TO on; SET citus.enable_repartition_joins to ON; +-- Ensure tuple data in explain analyze output is the same on all PG versions +SET citus.enable_binary_protocol = TRUE; -- Function that parses explain output as JSON CREATE FUNCTION explain_json(query text) RETURNS jsonb @@ -274,10 +276,10 @@ Sort (actual rows=50 loops=1) Group Key: remote_scan.l_quantity -> Custom Scan (Citus Adaptive) (actual rows=100 loops=1) Task Count: 2 - Tuple data received from nodes: 780 bytes + Tuple data received from nodes: 1800 bytes Tasks Shown: One of 2 -> Task - Tuple data received from node: 390 bytes + Tuple data received from node: 900 bytes Node: host=localhost port=xxxxx dbname=regression -> HashAggregate (actual rows=50 loops=1) Group Key: l_quantity @@ -294,7 +296,7 @@ EXPLAIN (COSTS off, ANALYZE on, TIMING off, SUMMARY off) SELECT count(*) FROM t1 Aggregate (actual rows=1 loops=1) -> Custom Scan (Citus Adaptive) (actual rows=4 loops=1) Task Count: 4 - Tuple data received from nodes: 4 bytes + Tuple data received from nodes: 32 bytes Tasks Shown: None, not supported for re-partition queries -> MapMergeJob Map Task Count: 3 @@ -313,7 +315,7 @@ Custom Scan (Citus Adaptive) (actual rows=1 loops=1) -> Aggregate (actual rows=1 loops=1) -> Custom Scan (Citus Adaptive) (actual rows=4 loops=1) Task Count: 4 - Tuple data received from nodes: 4 bytes + Tuple data received from nodes: 32 bytes Tasks Shown: None, not supported for re-partition queries -> MapMergeJob Map Task Count: 3 @@ -322,10 +324,10 @@ Custom Scan (Citus Adaptive) (actual rows=1 loops=1) Map Task Count: 3 Merge Task Count: 4 Task Count: 1 - Tuple data received from nodes: 1 bytes + Tuple data received from nodes: 8 bytes Tasks Shown: All -> Task - Tuple data received from node: 1 bytes + Tuple data received from node: 8 bytes Node: host=localhost port=xxxxx dbname=regression -> Aggregate (actual rows=1 loops=1) -> Function Scan on read_intermediate_result intermediate_result (actual rows=1 loops=1) @@ -345,11 +347,11 @@ Sort (actual rows=50 loops=1) -> Custom Scan (Citus Adaptive) (actual rows=100 loops=1) Output: remote_scan.l_quantity, remote_scan.count_quantity Task Count: 2 - Tuple data received from nodes: 780 bytes + Tuple data received from nodes: 1800 bytes Tasks Shown: One of 2 -> Task Query: SELECT l_quantity, count(*) AS count_quantity FROM public.lineitem_290000 lineitem WHERE true GROUP BY l_quantity - Tuple data received from node: 390 bytes + Tuple data received from node: 900 bytes Node: host=localhost port=xxxxx dbname=regression -> HashAggregate (actual rows=50 loops=1) Output: l_quantity, count(*) @@ -991,16 +993,16 @@ Sort (actual rows=50 loops=1) Group Key: remote_scan.l_quantity -> Custom Scan (Citus Adaptive) (actual rows=100 loops=1) Task Count: 2 - Tuple data received from nodes: 780 bytes + Tuple data received from nodes: 1800 bytes Tasks Shown: All -> Task - Tuple data received from node: 390 bytes + Tuple data received from node: 900 bytes Node: host=localhost port=xxxxx dbname=regression -> HashAggregate (actual rows=50 loops=1) Group Key: l_quantity -> Seq Scan on lineitem_290000 lineitem (actual rows=6000 loops=1) -> Task - Tuple data received from node: 390 bytes + Tuple data received from node: 900 bytes Node: host=localhost port=xxxxx dbname=regression -> HashAggregate (actual rows=50 loops=1) Group Key: l_quantity @@ -1262,10 +1264,10 @@ Custom Scan (Citus Adaptive) (cost=0.00..0.00 rows=100000 width=18) EXPLAIN (ANALYZE ON, COSTS OFF, TIMING OFF, SUMMARY OFF) EXECUTE router_executor_query_param(5); Custom Scan (Citus Adaptive) (actual rows=3 loops=1) Task Count: 1 - Tuple data received from nodes: 15 bytes + Tuple data received from nodes: 30 bytes Tasks Shown: All -> Task - Tuple data received from node: 15 bytes + Tuple data received from node: 30 bytes Node: host=localhost port=xxxxx dbname=regression -> Index Scan using lineitem_pkey_290000 on lineitem_290000 lineitem (actual rows=3 loops=1) Index Cond: (l_orderkey = 5) @@ -1867,10 +1869,10 @@ SELECT create_distributed_table('explain_analyze_test', 'a'); EXPLAIN :default_analyze_flags SELECT * FROM explain_analyze_test WHERE a = 1; Custom Scan (Citus Adaptive) (actual rows=1 loops=1) Task Count: 1 - Tuple data received from nodes: 8 bytes + Tuple data received from nodes: 11 bytes Tasks Shown: All -> Task - Tuple data received from node: 8 bytes + Tuple data received from node: 11 bytes Node: host=localhost port=xxxxx dbname=regression -> Seq Scan on explain_analyze_test_570009 explain_analyze_test (actual rows=1 loops=1) Filter: (a = 1) @@ -1879,10 +1881,10 @@ EXPLAIN :default_analyze_flags SELECT count(*) FROM explain_analyze_test; Aggregate (actual rows=1 loops=1) -> Custom Scan (Citus Adaptive) (actual rows=4 loops=1) Task Count: 4 - Tuple data received from nodes: 4 bytes + Tuple data received from nodes: 32 bytes Tasks Shown: One of 4 -> Task - Tuple data received from node: 1 bytes + Tuple data received from node: 8 bytes Node: host=localhost port=xxxxx dbname=regression -> Aggregate (actual rows=1 loops=1) -> Seq Scan on explain_analyze_test_570009 explain_analyze_test (actual rows=1 loops=1) @@ -2248,17 +2250,17 @@ Custom Scan (Citus Adaptive) (actual rows=1 loops=1) Result destination: Send to 2 nodes -> Custom Scan (Citus Adaptive) (actual rows=10 loops=1) Task Count: 4 - Tuple data received from nodes: 21 bytes + Tuple data received from nodes: 120 bytes Tasks Shown: One of 4 -> Task - Tuple data received from node: 9 bytes + Tuple data received from node: 48 bytes Node: host=localhost port=xxxxx dbname=regression -> Seq Scan on dist_table_570017 dist_table (actual rows=4 loops=1) Task Count: 1 - Tuple data received from nodes: 2 bytes + Tuple data received from nodes: 8 bytes Tasks Shown: All -> Task - Tuple data received from node: 2 bytes + Tuple data received from node: 8 bytes Node: host=localhost port=xxxxx dbname=regression -> Aggregate (actual rows=1 loops=1) -> Hash Join (actual rows=10 loops=1) @@ -2271,10 +2273,10 @@ SELECT count(distinct a) FROM (SELECT GREATEST(random(), 2) r, a FROM dist_table Aggregate (actual rows=1 loops=1) -> Custom Scan (Citus Adaptive) (actual rows=4 loops=1) Task Count: 4 - Tuple data received from nodes: 4 bytes + Tuple data received from nodes: 32 bytes Tasks Shown: One of 4 -> Task - Tuple data received from node: 1 bytes + Tuple data received from node: 8 bytes Node: host=localhost port=xxxxx dbname=regression -> Aggregate (actual rows=1 loops=1) -> Merge Join (actual rows=4 loops=1) @@ -2314,10 +2316,10 @@ Aggregate (actual rows=1 loops=1) Sort Method: quicksort Memory: 25kB -> Seq Scan on ref_table_570021 ref_table (actual rows=10 loops=1) Task Count: 4 - Tuple data received from nodes: 4 bytes + Tuple data received from nodes: 32 bytes Tasks Shown: One of 4 -> Task - Tuple data received from node: 1 bytes + Tuple data received from node: 8 bytes Node: host=localhost port=xxxxx dbname=regression -> Aggregate (actual rows=1 loops=1) InitPlan 1 (returns $0) @@ -2340,10 +2342,10 @@ Custom Scan (Citus Adaptive) (actual rows=1 loops=1) Result destination: Write locally -> Custom Scan (Citus Adaptive) (actual rows=20 loops=1) Task Count: 4 - Tuple data received from nodes: 44 bytes + Tuple data received from nodes: 160 bytes Tasks Shown: One of 4 -> Task - Tuple data received from node: 20 bytes + Tuple data received from node: 64 bytes Node: host=localhost port=xxxxx dbname=regression -> Insert on dist_table_570017 citus_table_alias (actual rows=8 loops=1) -> Seq Scan on dist_table_570017 dist_table (actual rows=8 loops=1) @@ -2353,17 +2355,17 @@ Custom Scan (Citus Adaptive) (actual rows=1 loops=1) Result destination: Write locally -> Custom Scan (Citus Adaptive) (actual rows=10 loops=1) Task Count: 1 - Tuple data received from nodes: 28 bytes + Tuple data received from nodes: 50 bytes Tasks Shown: All -> Task - Tuple data received from node: 28 bytes + Tuple data received from node: 50 bytes Node: host=localhost port=xxxxx dbname=regression -> Function Scan on read_intermediate_result intermediate_result (actual rows=10 loops=1) Task Count: 1 - Tuple data received from nodes: 2 bytes + Tuple data received from nodes: 8 bytes Tasks Shown: All -> Task - Tuple data received from node: 2 bytes + Tuple data received from node: 8 bytes Node: host=localhost port=xxxxx dbname=regression -> Aggregate (actual rows=1 loops=1) -> Function Scan on read_intermediate_result intermediate_result (actual rows=10 loops=1) @@ -2373,10 +2375,10 @@ prepare ref_select(int) AS select * from ref_table where 1 = $1; explain :default_analyze_flags execute ref_select(1); Custom Scan (Citus Adaptive) (actual rows=10 loops=1) Task Count: 1 - Tuple data received from nodes: 11 bytes + Tuple data received from nodes: 40 bytes Tasks Shown: All -> Task - Tuple data received from node: 11 bytes + Tuple data received from node: 40 bytes Node: host=localhost port=xxxxx dbname=regression -> Result (actual rows=10 loops=1) One-Time Filter: (1 = $1) @@ -2396,104 +2398,104 @@ SELECT create_distributed_table('dist_table_rep2', 'a'); EXPLAIN :default_analyze_flags INSERT INTO dist_table_rep1 VALUES(1), (2), (3), (4), (10), (100) RETURNING *; Custom Scan (Citus Adaptive) (actual rows=6 loops=1) Task Count: 2 - Tuple data received from nodes: 9 bytes + Tuple data received from nodes: 24 bytes Tasks Shown: One of 2 -> Task - Tuple data received from node: 5 bytes + Tuple data received from node: 16 bytes Node: host=localhost port=xxxxx dbname=regression -> Insert on dist_table_rep1_570022 citus_table_alias (actual rows=4 loops=1) -> Values Scan on "*VALUES*" (actual rows=4 loops=1) EXPLAIN :default_analyze_flags SELECT * from dist_table_rep1; Custom Scan (Citus Adaptive) (actual rows=6 loops=1) Task Count: 2 - Tuple data received from nodes: 9 bytes + Tuple data received from nodes: 24 bytes Tasks Shown: One of 2 -> Task - Tuple data received from node: 5 bytes + Tuple data received from node: 16 bytes Node: host=localhost port=xxxxx dbname=regression -> Seq Scan on dist_table_rep1_570022 dist_table_rep1 (actual rows=4 loops=1) EXPLAIN :default_analyze_flags INSERT INTO dist_table_rep2 VALUES(1), (2), (3), (4), (10), (100) RETURNING *; Custom Scan (Citus Adaptive) (actual rows=6 loops=1) Task Count: 2 - Tuple data received from nodes: 18 bytes + Tuple data received from nodes: 48 bytes Tasks Shown: One of 2 -> Task - Tuple data received from node: 10 bytes + Tuple data received from node: 32 bytes Node: host=localhost port=xxxxx dbname=regression -> Insert on dist_table_rep2_570024 citus_table_alias (actual rows=4 loops=1) -> Values Scan on "*VALUES*" (actual rows=4 loops=1) EXPLAIN :default_analyze_flags SELECT * from dist_table_rep2; Custom Scan (Citus Adaptive) (actual rows=6 loops=1) Task Count: 2 - Tuple data received from nodes: 9 bytes + Tuple data received from nodes: 24 bytes Tasks Shown: One of 2 -> Task - Tuple data received from node: 5 bytes + Tuple data received from node: 16 bytes Node: host=localhost port=xxxxx dbname=regression -> Seq Scan on dist_table_rep2_570024 dist_table_rep2 (actual rows=4 loops=1) prepare p1 as SELECT * FROM dist_table_rep1; EXPLAIN :default_analyze_flags EXECUTE p1; Custom Scan (Citus Adaptive) (actual rows=6 loops=1) Task Count: 2 - Tuple data received from nodes: 9 bytes + Tuple data received from nodes: 24 bytes Tasks Shown: One of 2 -> Task - Tuple data received from node: 5 bytes + Tuple data received from node: 16 bytes Node: host=localhost port=xxxxx dbname=regression -> Seq Scan on dist_table_rep1_570022 dist_table_rep1 (actual rows=4 loops=1) EXPLAIN :default_analyze_flags EXECUTE p1; Custom Scan (Citus Adaptive) (actual rows=6 loops=1) Task Count: 2 - Tuple data received from nodes: 9 bytes + Tuple data received from nodes: 24 bytes Tasks Shown: One of 2 -> Task - Tuple data received from node: 5 bytes + Tuple data received from node: 16 bytes Node: host=localhost port=xxxxx dbname=regression -> Seq Scan on dist_table_rep1_570022 dist_table_rep1 (actual rows=4 loops=1) EXPLAIN :default_analyze_flags EXECUTE p1; Custom Scan (Citus Adaptive) (actual rows=6 loops=1) Task Count: 2 - Tuple data received from nodes: 9 bytes + Tuple data received from nodes: 24 bytes Tasks Shown: One of 2 -> Task - Tuple data received from node: 5 bytes + Tuple data received from node: 16 bytes Node: host=localhost port=xxxxx dbname=regression -> Seq Scan on dist_table_rep1_570022 dist_table_rep1 (actual rows=4 loops=1) EXPLAIN :default_analyze_flags EXECUTE p1; Custom Scan (Citus Adaptive) (actual rows=6 loops=1) Task Count: 2 - Tuple data received from nodes: 9 bytes + Tuple data received from nodes: 24 bytes Tasks Shown: One of 2 -> Task - Tuple data received from node: 5 bytes + Tuple data received from node: 16 bytes Node: host=localhost port=xxxxx dbname=regression -> Seq Scan on dist_table_rep1_570022 dist_table_rep1 (actual rows=4 loops=1) EXPLAIN :default_analyze_flags EXECUTE p1; Custom Scan (Citus Adaptive) (actual rows=6 loops=1) Task Count: 2 - Tuple data received from nodes: 9 bytes + Tuple data received from nodes: 24 bytes Tasks Shown: One of 2 -> Task - Tuple data received from node: 5 bytes + Tuple data received from node: 16 bytes Node: host=localhost port=xxxxx dbname=regression -> Seq Scan on dist_table_rep1_570022 dist_table_rep1 (actual rows=4 loops=1) EXPLAIN :default_analyze_flags EXECUTE p1; Custom Scan (Citus Adaptive) (actual rows=6 loops=1) Task Count: 2 - Tuple data received from nodes: 9 bytes + Tuple data received from nodes: 24 bytes Tasks Shown: One of 2 -> Task - Tuple data received from node: 5 bytes + Tuple data received from node: 16 bytes Node: host=localhost port=xxxxx dbname=regression -> Seq Scan on dist_table_rep1_570022 dist_table_rep1 (actual rows=4 loops=1) prepare p2 AS SELECT * FROM dist_table_rep1 WHERE a = $1; EXPLAIN :default_analyze_flags EXECUTE p2(1); Custom Scan (Citus Adaptive) (actual rows=1 loops=1) Task Count: 1 - Tuple data received from nodes: 1 bytes + Tuple data received from nodes: 4 bytes Tasks Shown: All -> Task - Tuple data received from node: 1 bytes + Tuple data received from node: 4 bytes Node: host=localhost port=xxxxx dbname=regression -> Seq Scan on dist_table_rep1_570022 dist_table_rep1 (actual rows=1 loops=1) Filter: (a = 1) @@ -2501,10 +2503,10 @@ Custom Scan (Citus Adaptive) (actual rows=1 loops=1) EXPLAIN :default_analyze_flags EXECUTE p2(1); Custom Scan (Citus Adaptive) (actual rows=1 loops=1) Task Count: 1 - Tuple data received from nodes: 1 bytes + Tuple data received from nodes: 4 bytes Tasks Shown: All -> Task - Tuple data received from node: 1 bytes + Tuple data received from node: 4 bytes Node: host=localhost port=xxxxx dbname=regression -> Seq Scan on dist_table_rep1_570022 dist_table_rep1 (actual rows=1 loops=1) Filter: (a = 1) @@ -2512,10 +2514,10 @@ Custom Scan (Citus Adaptive) (actual rows=1 loops=1) EXPLAIN :default_analyze_flags EXECUTE p2(1); Custom Scan (Citus Adaptive) (actual rows=1 loops=1) Task Count: 1 - Tuple data received from nodes: 1 bytes + Tuple data received from nodes: 4 bytes Tasks Shown: All -> Task - Tuple data received from node: 1 bytes + Tuple data received from node: 4 bytes Node: host=localhost port=xxxxx dbname=regression -> Seq Scan on dist_table_rep1_570022 dist_table_rep1 (actual rows=1 loops=1) Filter: (a = 1) @@ -2523,10 +2525,10 @@ Custom Scan (Citus Adaptive) (actual rows=1 loops=1) EXPLAIN :default_analyze_flags EXECUTE p2(1); Custom Scan (Citus Adaptive) (actual rows=1 loops=1) Task Count: 1 - Tuple data received from nodes: 1 bytes + Tuple data received from nodes: 4 bytes Tasks Shown: All -> Task - Tuple data received from node: 1 bytes + Tuple data received from node: 4 bytes Node: host=localhost port=xxxxx dbname=regression -> Seq Scan on dist_table_rep1_570022 dist_table_rep1 (actual rows=1 loops=1) Filter: (a = 1) @@ -2534,10 +2536,10 @@ Custom Scan (Citus Adaptive) (actual rows=1 loops=1) EXPLAIN :default_analyze_flags EXECUTE p2(1); Custom Scan (Citus Adaptive) (actual rows=1 loops=1) Task Count: 1 - Tuple data received from nodes: 1 bytes + Tuple data received from nodes: 4 bytes Tasks Shown: All -> Task - Tuple data received from node: 1 bytes + Tuple data received from node: 4 bytes Node: host=localhost port=xxxxx dbname=regression -> Seq Scan on dist_table_rep1_570022 dist_table_rep1 (actual rows=1 loops=1) Filter: (a = 1) @@ -2545,10 +2547,10 @@ Custom Scan (Citus Adaptive) (actual rows=1 loops=1) EXPLAIN :default_analyze_flags EXECUTE p2(1); Custom Scan (Citus Adaptive) (actual rows=1 loops=1) Task Count: 1 - Tuple data received from nodes: 1 bytes + Tuple data received from nodes: 4 bytes Tasks Shown: All -> Task - Tuple data received from node: 1 bytes + Tuple data received from node: 4 bytes Node: host=localhost port=xxxxx dbname=regression -> Seq Scan on dist_table_rep1_570022 dist_table_rep1 (actual rows=1 loops=1) Filter: (a = 1) @@ -2556,10 +2558,10 @@ Custom Scan (Citus Adaptive) (actual rows=1 loops=1) EXPLAIN :default_analyze_flags EXECUTE p2(10); Custom Scan (Citus Adaptive) (actual rows=1 loops=1) Task Count: 1 - Tuple data received from nodes: 2 bytes + Tuple data received from nodes: 4 bytes Tasks Shown: All -> Task - Tuple data received from node: 2 bytes + Tuple data received from node: 4 bytes Node: host=localhost port=xxxxx dbname=regression -> Seq Scan on dist_table_rep1_570022 dist_table_rep1 (actual rows=1 loops=1) Filter: (a = 10) @@ -2567,10 +2569,10 @@ Custom Scan (Citus Adaptive) (actual rows=1 loops=1) EXPLAIN :default_analyze_flags EXECUTE p2(100); Custom Scan (Citus Adaptive) (actual rows=1 loops=1) Task Count: 1 - Tuple data received from nodes: 3 bytes + Tuple data received from nodes: 4 bytes Tasks Shown: All -> Task - Tuple data received from node: 3 bytes + Tuple data received from node: 4 bytes Node: host=localhost port=xxxxx dbname=regression -> Seq Scan on dist_table_rep1_570023 dist_table_rep1 (actual rows=1 loops=1) Filter: (a = 100) @@ -2579,10 +2581,10 @@ prepare p3 AS SELECT * FROM dist_table_rep1 WHERE a = 1; EXPLAIN :default_analyze_flags EXECUTE p3; Custom Scan (Citus Adaptive) (actual rows=1 loops=1) Task Count: 1 - Tuple data received from nodes: 1 bytes + Tuple data received from nodes: 4 bytes Tasks Shown: All -> Task - Tuple data received from node: 1 bytes + Tuple data received from node: 4 bytes Node: host=localhost port=xxxxx dbname=regression -> Seq Scan on dist_table_rep1_570022 dist_table_rep1 (actual rows=1 loops=1) Filter: (a = 1) @@ -2590,10 +2592,10 @@ Custom Scan (Citus Adaptive) (actual rows=1 loops=1) EXPLAIN :default_analyze_flags EXECUTE p3; Custom Scan (Citus Adaptive) (actual rows=1 loops=1) Task Count: 1 - Tuple data received from nodes: 1 bytes + Tuple data received from nodes: 4 bytes Tasks Shown: All -> Task - Tuple data received from node: 1 bytes + Tuple data received from node: 4 bytes Node: host=localhost port=xxxxx dbname=regression -> Seq Scan on dist_table_rep1_570022 dist_table_rep1 (actual rows=1 loops=1) Filter: (a = 1) @@ -2601,10 +2603,10 @@ Custom Scan (Citus Adaptive) (actual rows=1 loops=1) EXPLAIN :default_analyze_flags EXECUTE p3; Custom Scan (Citus Adaptive) (actual rows=1 loops=1) Task Count: 1 - Tuple data received from nodes: 1 bytes + Tuple data received from nodes: 4 bytes Tasks Shown: All -> Task - Tuple data received from node: 1 bytes + Tuple data received from node: 4 bytes Node: host=localhost port=xxxxx dbname=regression -> Seq Scan on dist_table_rep1_570022 dist_table_rep1 (actual rows=1 loops=1) Filter: (a = 1) @@ -2612,10 +2614,10 @@ Custom Scan (Citus Adaptive) (actual rows=1 loops=1) EXPLAIN :default_analyze_flags EXECUTE p3; Custom Scan (Citus Adaptive) (actual rows=1 loops=1) Task Count: 1 - Tuple data received from nodes: 1 bytes + Tuple data received from nodes: 4 bytes Tasks Shown: All -> Task - Tuple data received from node: 1 bytes + Tuple data received from node: 4 bytes Node: host=localhost port=xxxxx dbname=regression -> Seq Scan on dist_table_rep1_570022 dist_table_rep1 (actual rows=1 loops=1) Filter: (a = 1) @@ -2623,10 +2625,10 @@ Custom Scan (Citus Adaptive) (actual rows=1 loops=1) EXPLAIN :default_analyze_flags EXECUTE p3; Custom Scan (Citus Adaptive) (actual rows=1 loops=1) Task Count: 1 - Tuple data received from nodes: 1 bytes + Tuple data received from nodes: 4 bytes Tasks Shown: All -> Task - Tuple data received from node: 1 bytes + Tuple data received from node: 4 bytes Node: host=localhost port=xxxxx dbname=regression -> Seq Scan on dist_table_rep1_570022 dist_table_rep1 (actual rows=1 loops=1) Filter: (a = 1) @@ -2634,10 +2636,10 @@ Custom Scan (Citus Adaptive) (actual rows=1 loops=1) EXPLAIN :default_analyze_flags EXECUTE p3; Custom Scan (Citus Adaptive) (actual rows=1 loops=1) Task Count: 1 - Tuple data received from nodes: 1 bytes + Tuple data received from nodes: 4 bytes Tasks Shown: All -> Task - Tuple data received from node: 1 bytes + Tuple data received from node: 4 bytes Node: host=localhost port=xxxxx dbname=regression -> Seq Scan on dist_table_rep1_570022 dist_table_rep1 (actual rows=1 loops=1) Filter: (a = 1) @@ -2860,10 +2862,10 @@ Custom Scan (Citus Adaptive) (actual rows=10 loops=1) Filter: (l_quantity < '-1'::numeric) Rows Removed by Filter: 2885 Task Count: 1 - Tuple data received from nodes: 11 bytes + Tuple data received from nodes: 40 bytes Tasks Shown: All -> Task - Tuple data received from node: 11 bytes + Tuple data received from node: 40 bytes Node: host=localhost port=xxxxx dbname=regression -> Function Scan on generate_series s (actual rows=10 loops=1) ROLLBACK; @@ -2920,7 +2922,7 @@ set citus.explain_analyze_sort_method to "taskId"; EXPLAIN (COSTS FALSE, ANALYZE TRUE, TIMING FALSE, SUMMARY FALSE) select a, CASE WHEN pg_sleep(0.4) IS NULL THEN 'x' END from explain_analyze_execution_time; Custom Scan (Citus Adaptive) (actual rows=1 loops=1) Task Count: 2 - Tuple data received from nodes: 1 bytes + Tuple data received from nodes: 4 bytes Tasks Shown: One of 2 -> Task Tuple data received from node: 0 bytes @@ -2930,10 +2932,10 @@ set citus.explain_analyze_sort_method to "execution-time"; EXPLAIN (COSTS FALSE, ANALYZE TRUE, TIMING FALSE, SUMMARY FALSE) select a, CASE WHEN pg_sleep(0.4) IS NULL THEN 'x' END from explain_analyze_execution_time; Custom Scan (Citus Adaptive) (actual rows=1 loops=1) Task Count: 2 - Tuple data received from nodes: 1 bytes + Tuple data received from nodes: 4 bytes Tasks Shown: One of 2 -> Task - Tuple data received from node: 1 bytes + Tuple data received from node: 4 bytes Node: host=localhost port=xxxxx dbname=regression -> Seq Scan on explain_analyze_execution_time_570030 explain_analyze_execution_time (actual rows=1 loops=1) -- reset back @@ -2987,10 +2989,10 @@ Limit (actual rows=1 loops=1) Node: host=localhost port=xxxxx dbname=regression -> Seq Scan on distributed_table_1_570032 distributed_table_xxx (actual rows=1 loops=1) Task Count: 2 - Tuple data received from nodes: 3 bytes + Tuple data received from nodes: 16 bytes Tasks Shown: One of 2 -> Task - Tuple data received from node: 3 bytes + Tuple data received from node: 16 bytes Node: host=localhost port=xxxxx dbname=regression -> Limit (actual rows=1 loops=1) -> Nested Loop (actual rows=1 loops=1) diff --git a/src/test/regress/expected/multi_mx_function_call_delegation.out b/src/test/regress/expected/multi_mx_function_call_delegation.out index 309df4664..fe4e41efa 100644 --- a/src/test/regress/expected/multi_mx_function_call_delegation.out +++ b/src/test/regress/expected/multi_mx_function_call_delegation.out @@ -230,8 +230,6 @@ DEBUG: pushing down the function call (S,S) (1 row) --- this is fixed with pg14 and this will fail prior to --- pg 14 SET citus.enable_binary_protocol = TRUE; select mx_call_func_custom_types('S', 'A'); DEBUG: pushing down the function call diff --git a/src/test/regress/expected/multi_mx_function_call_delegation_0.out b/src/test/regress/expected/multi_mx_function_call_delegation_0.out index 835dd5263..034f8f5e8 100644 --- a/src/test/regress/expected/multi_mx_function_call_delegation_0.out +++ b/src/test/regress/expected/multi_mx_function_call_delegation_0.out @@ -230,15 +230,21 @@ DEBUG: pushing down the function call (S,S) (1 row) --- this is fixed with pg14 and this will fail prior to --- pg 14 SET citus.enable_binary_protocol = TRUE; select mx_call_func_custom_types('S', 'A'); DEBUG: pushing down the function call -ERROR: wrong data type: XXXX, expected XXXX + mx_call_func_custom_types +--------------------------------------------------------------------- + (S,S) +(1 row) + select multi_mx_function_call_delegation.mx_call_func_custom_types('S', 'A'); DEBUG: pushing down the function call -ERROR: wrong data type: XXXX, expected XXXX + mx_call_func_custom_types +--------------------------------------------------------------------- + (S,S) +(1 row) + RESET citus.enable_binary_protocol; -- We don't allow distributing calls inside transactions begin; diff --git a/src/test/regress/expected/pg13.out b/src/test/regress/expected/pg13.out index 7b730ed64..01a93afe0 100644 --- a/src/test/regress/expected/pg13.out +++ b/src/test/regress/expected/pg13.out @@ -10,6 +10,8 @@ set search_path to test_pg13; SET citus.shard_replication_factor to 1; SET citus.shard_count to 2; SET citus.next_shard_id TO 65000; +-- Ensure tuple data in explain analyze output is the same on all PG versions +SET citus.enable_binary_protocol = TRUE; CREATE TABLE dist_table (name char, age int); CREATE INDEX name_index on dist_table(name); SELECT create_distributed_table('dist_table', 'name'); @@ -217,10 +219,10 @@ INSERT INTO test_wal VALUES(3,33),(4,44),(5,55) RETURNING *; --------------------------------------------------------------------- Custom Scan (Citus Adaptive) (actual rows=3 loops=1) Task Count: 1 - Tuple data received from nodes: 9 bytes + Tuple data received from nodes: 24 bytes Tasks Shown: All -> Task - Tuple data received from node: 9 bytes + Tuple data received from node: 24 bytes Node: host=localhost port=xxxxx dbname=regression -> Insert on test_wal_65012 citus_table_alias (actual rows=3 loops=1) WAL: records=3 bytes=189 diff --git a/src/test/regress/expected/single_node.out b/src/test/regress/expected/single_node.out index 6bf5af799..d1573522c 100644 --- a/src/test/regress/expected/single_node.out +++ b/src/test/regress/expected/single_node.out @@ -3,6 +3,8 @@ SET search_path TO single_node; SET citus.shard_count TO 4; SET citus.shard_replication_factor TO 1; SET citus.next_shard_id TO 90630500; +-- Ensure tuple data in explain analyze output is the same on all PG versions +SET citus.enable_binary_protocol = TRUE; -- adding the coordinator as inactive is disallowed SELECT 1 FROM master_add_inactive_node('localhost', :master_port, groupid => 0); ERROR: coordinator node cannot be added as inactive node @@ -580,10 +582,10 @@ EXPLAIN (COSTS FALSE, ANALYZE TRUE, TIMING FALSE, SUMMARY FALSE) --------------------------------------------------------------------- Custom Scan (Citus Adaptive) (actual rows=5 loops=1) Task Count: 4 - Tuple data received from nodes: 10 bytes + Tuple data received from nodes: 40 bytes Tasks Shown: One of 4 -> Task - Tuple data received from node: 4 bytes + Tuple data received from node: 16 bytes Node: host=localhost port=xxxxx dbname=regression -> Seq Scan on test_90630506 test (actual rows=2 loops=1) (8 rows) diff --git a/src/test/regress/expected/subquery_complex_target_list.out b/src/test/regress/expected/subquery_complex_target_list.out index 8ac4fac2e..934f89224 100644 --- a/src/test/regress/expected/subquery_complex_target_list.out +++ b/src/test/regress/expected/subquery_complex_target_list.out @@ -129,6 +129,10 @@ DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT foo."?column? -- we reset the client min_messages here to avoid adding an alternative output -- for pg14 as the output slightly differs. RESET client_min_messages; +-- Set binary protocol temporarily to true always, to get consistent float +-- output across PG versions. +-- This is a no-op for PG_VERSION_NUM >= 14 +SET citus.enable_binary_protocol = TRUE; -- Expressions inside the aggregates -- parts of the query is inspired by TPCH queries SELECT @@ -172,10 +176,13 @@ FROM ORDER BY 1 DESC; avg | cnt_1 | cnt_2 | cnt_3 | sum_1 | l_year | pos | count_pay --------------------------------------------------------------------- - 30.14666771571734992301 | 3308.14619815793 | 2.5000000000000000 | | 31 | 2017 | 0 | 1 + 30.14666771571734992301 | 3308.14619815794 | 2.5000000000000000 | | 31 | 2017 | 0 | 1 (1 row) SET client_min_messages TO DEBUG1; +-- Reset binary protocol back after temporaribly changing it +-- This is a no-op for PG_VERSION_NUM >= 14 +RESET citus.enable_binary_protocol; -- Multiple columns in GROUP BYs -- foo needs to be recursively planned, bar can be pushded down SELECT diff --git a/src/test/regress/expected/worker_disable_binary_worker_copy_format.out b/src/test/regress/expected/worker_disable_binary_worker_copy_format.out new file mode 100644 index 000000000..1653912fc --- /dev/null +++ b/src/test/regress/expected/worker_disable_binary_worker_copy_format.out @@ -0,0 +1,20 @@ +-- The files we use in the following text use the text based worker copy +-- format. So we disable the binary worker copy format here. +-- This is a no-op for PG_VERSION_NUM < 14, because the default is off there. +ALTER SYSTEM SET citus.binary_worker_copy_format TO off; +SELECT pg_reload_conf(); + pg_reload_conf +--------------------------------------------------------------------- + t +(1 row) + +SELECT success FROM run_command_on_workers('ALTER SYSTEM SET citus.binary_worker_copy_format TO off'); + success +--------------------------------------------------------------------- +(0 rows) + +SELECT success FROM run_command_on_workers('SELECT pg_reload_conf()'); + success +--------------------------------------------------------------------- +(0 rows) + diff --git a/src/test/regress/pg_regress_multi.pl b/src/test/regress/pg_regress_multi.pl index 5fd945320..4c21e7ffe 100755 --- a/src/test/regress/pg_regress_multi.pl +++ b/src/test/regress/pg_regress_multi.pl @@ -468,6 +468,8 @@ if ($useMitmproxy) { # make tests reproducible by never trying to negotiate ssl push(@pgOptions, "citus.node_conninfo='sslmode=disable'"); + # The commands that we intercept are based on the the text based protocol. + push(@pgOptions, "citus.enable_binary_protocol='false'"); } elsif ($followercluster) { diff --git a/src/test/regress/sql/binary_protocol.sql b/src/test/regress/sql/binary_protocol.sql index 4fb2a0cac..2e721d33a 100644 --- a/src/test/regress/sql/binary_protocol.sql +++ b/src/test/regress/sql/binary_protocol.sql @@ -1,7 +1,7 @@ SET citus.shard_count = 2; SET citus.next_shard_id TO 4754000; CREATE SCHEMA binary_protocol; -SET search_path TO binary_protocol; +SET search_path TO binary_protocol, public; SET citus.enable_binary_protocol = TRUE; CREATE TABLE t(id int); @@ -19,8 +19,6 @@ SELECT id, id, id, id, id, id, id, id, id, id FROM t ORDER BY id; --- EXPLAIN ANALYZE is currently forced to use text protocol. Once that is --- changed the numbers reported should change. EXPLAIN (ANALYZE TRUE, TIMING FALSE, COSTS FALSE, SUMMARY FALSE) SELECT id FROM t ORDER BY 1; SET citus.explain_all_tasks TO ON; EXPLAIN (ANALYZE TRUE, TIMING FALSE, COSTS FALSE, SUMMARY FALSE) SELECT id FROM t ORDER BY 1; @@ -39,30 +37,30 @@ CREATE TYPE composite_type AS ( CREATE TABLE composite_type_table ( id bigserial, - col composite_type[] + col composite_type ); - SELECT create_distributed_table('composite_type_table', 'id'); -INSERT INTO composite_type_table(col) VALUES (ARRAY[(1, 2)::composite_type]); - -SELECT * FROM composite_type_table; CREATE TYPE nested_composite_type AS ( a composite_type, b composite_type ); +select run_command_on_master_and_workers($$CREATE DOMAIN binary_protocol.composite_type_domain AS binary_protocol.composite_type$$); +select run_command_on_master_and_workers($$CREATE DOMAIN binary_protocol.nested_composite_type_domain AS binary_protocol.nested_composite_type$$); -CREATE TABLE nested_composite_type_table -( - id bigserial, - col nested_composite_type -); -SELECT create_distributed_table('nested_composite_type_table', 'id'); -INSERT INTO nested_composite_type_table(col) VALUES (((1, 2), (3,4))::nested_composite_type); - -SELECT * FROM nested_composite_type_table; +INSERT INTO composite_type_table(col) VALUES ((1, 2)::composite_type); +SELECT col FROM composite_type_table; +SELECT col::composite_type_domain FROM composite_type_table; +SELECT (col, col) FROM composite_type_table; +SELECT (col, col)::nested_composite_type FROM composite_type_table; +SELECT (col, col)::nested_composite_type_domain FROM composite_type_table; +SELECT ARRAY[col] FROM composite_type_table; +SELECT ARRAY[col::composite_type_domain] FROM composite_type_table; +SELECT ARRAY[(col, col)] FROM composite_type_table; +SELECT ARRAY[(col, col)::nested_composite_type] FROM composite_type_table; +SELECT ARRAY[(col, col)::nested_composite_type_domain] FROM composite_type_table; CREATE TABLE binaryless_builtin ( @@ -71,8 +69,25 @@ col2 character varying(255) NOT NULL ); SELECT create_reference_table('binaryless_builtin'); +CREATE TYPE binaryless_composite_type AS ( + a aclitem, + b aclitem +); + +select run_command_on_master_and_workers($$CREATE DOMAIN binary_protocol.binaryless_domain AS aclitem$$); +select run_command_on_master_and_workers($$CREATE DOMAIN binary_protocol.binaryless_composite_domain AS binary_protocol.binaryless_composite_type$$); + INSERT INTO binaryless_builtin VALUES ('user postgres=r/postgres', 'test'); -SELECT * FROM binaryless_builtin; +SELECT col1 FROM binaryless_builtin; +SELECT col1::binaryless_domain FROM binaryless_builtin; +SELECT (col1, col1) FROM binaryless_builtin; +SELECT (col1, col1)::binaryless_composite_type FROM binaryless_builtin; +SELECT (col1, col1)::binaryless_composite_domain FROM binaryless_builtin; +SELECT ARRAY[col1] FROM binaryless_builtin; +SELECT ARRAY[col1::binaryless_domain] FROM binaryless_builtin; +SELECT ARRAY[(col1, col1)] FROM binaryless_builtin; +SELECT ARRAY[(col1, col1)::binaryless_composite_type] FROM binaryless_builtin; +SELECT ARRAY[(col1, col1)::binaryless_composite_domain] FROM binaryless_builtin; CREATE TABLE test_table_1(id int, val1 int); CREATE TABLE test_table_2(id int, val1 bigint); @@ -85,6 +100,5 @@ SELECT id, val1 FROM test_table_1 LEFT JOIN test_table_2 USING(id, val1) ORDER BY 1, 2; -\set VERBOSITY terse +SET client_min_messages TO WARNING; DROP SCHEMA binary_protocol CASCADE; - diff --git a/src/test/regress/sql/intermediate_results.sql b/src/test/regress/sql/intermediate_results.sql index b260f5ea2..ae5402345 100644 --- a/src/test/regress/sql/intermediate_results.sql +++ b/src/test/regress/sql/intermediate_results.sql @@ -187,7 +187,7 @@ SELECT broadcast_intermediate_result('stored_squares_1', -- query the intermediate result in a router query using text format SELECT * FROM interesting_squares JOIN ( SELECT * FROM - read_intermediate_results(ARRAY['stored_squares_1', 'stored_squares_2'], 'binary') AS res (x int, x2 int, z intermediate_results.square_type) + read_intermediate_results(ARRAY['stored_squares_1', 'stored_squares_2'], 'text') AS res (x int, x2 int, z intermediate_results.square_type) ) squares ON (squares.x::text = interested_in) WHERE user_id = 'jon' ORDER BY 1,2; diff --git a/src/test/regress/sql/limit_intermediate_size.sql b/src/test/regress/sql/limit_intermediate_size.sql index 420923f84..5b013bf75 100644 --- a/src/test/regress/sql/limit_intermediate_size.sql +++ b/src/test/regress/sql/limit_intermediate_size.sql @@ -1,5 +1,10 @@ SET citus.enable_repartition_joins to ON; +-- The intermediate result limits chosen below are based on text sizes of the +-- intermediate results. This is a no-op for PG_VERSION_NUM < 14, because the +-- default is false there. +SET citus.enable_binary_protocol = FALSE; + SET citus.max_intermediate_result_size TO 2; -- should fail because the copy size is ~4kB for each cte WITH cte AS MATERIALIZED diff --git a/src/test/regress/sql/local_shard_execution.sql b/src/test/regress/sql/local_shard_execution.sql index 77974a9a2..b38e7b2f1 100644 --- a/src/test/regress/sql/local_shard_execution.sql +++ b/src/test/regress/sql/local_shard_execution.sql @@ -204,6 +204,8 @@ INSERT INTO distributed_table SELECT 1, '1',15 FROM distributed_table WHERE key -- sanity check: multi-shard INSERT..SELECT pushdown goes through distributed execution INSERT INTO distributed_table SELECT * FROM distributed_table ON CONFLICT DO NOTHING; +-- Ensure tuple data in explain analyze output is the same on all PG versions +SET citus.enable_binary_protocol = TRUE; -- EXPLAIN for local execution just works fine -- though going through distributed execution diff --git a/src/test/regress/sql/multi_explain.sql b/src/test/regress/sql/multi_explain.sql index d58477f72..d2a89d3ee 100644 --- a/src/test/regress/sql/multi_explain.sql +++ b/src/test/regress/sql/multi_explain.sql @@ -9,6 +9,9 @@ SET citus.next_shard_id TO 570000; SET citus.explain_distributed_queries TO on; SET citus.enable_repartition_joins to ON; +-- Ensure tuple data in explain analyze output is the same on all PG versions +SET citus.enable_binary_protocol = TRUE; + -- Function that parses explain output as JSON CREATE FUNCTION explain_json(query text) RETURNS jsonb diff --git a/src/test/regress/sql/multi_mx_function_call_delegation.sql b/src/test/regress/sql/multi_mx_function_call_delegation.sql index 513146a4e..46d91ba39 100644 --- a/src/test/regress/sql/multi_mx_function_call_delegation.sql +++ b/src/test/regress/sql/multi_mx_function_call_delegation.sql @@ -102,15 +102,11 @@ select squares(4); select multi_mx_function_call_delegation.mx_call_func(2, 0); select multi_mx_function_call_delegation.mx_call_func_custom_types('S', 'A'); - --- this is fixed with pg14 and this will fail prior to --- pg 14 SET citus.enable_binary_protocol = TRUE; select mx_call_func_custom_types('S', 'A'); select multi_mx_function_call_delegation.mx_call_func_custom_types('S', 'A'); RESET citus.enable_binary_protocol; - -- We don't allow distributing calls inside transactions begin; select mx_call_func(2, 0); diff --git a/src/test/regress/sql/pg13.sql b/src/test/regress/sql/pg13.sql index 4145acf77..8c1e1b771 100644 --- a/src/test/regress/sql/pg13.sql +++ b/src/test/regress/sql/pg13.sql @@ -14,6 +14,9 @@ SET citus.shard_replication_factor to 1; SET citus.shard_count to 2; SET citus.next_shard_id TO 65000; +-- Ensure tuple data in explain analyze output is the same on all PG versions +SET citus.enable_binary_protocol = TRUE; + CREATE TABLE dist_table (name char, age int); CREATE INDEX name_index on dist_table(name); diff --git a/src/test/regress/sql/single_node.sql b/src/test/regress/sql/single_node.sql index 228039d34..2ee0d456b 100644 --- a/src/test/regress/sql/single_node.sql +++ b/src/test/regress/sql/single_node.sql @@ -4,6 +4,9 @@ SET citus.shard_count TO 4; SET citus.shard_replication_factor TO 1; SET citus.next_shard_id TO 90630500; +-- Ensure tuple data in explain analyze output is the same on all PG versions +SET citus.enable_binary_protocol = TRUE; + -- adding the coordinator as inactive is disallowed SELECT 1 FROM master_add_inactive_node('localhost', :master_port, groupid => 0); diff --git a/src/test/regress/sql/subquery_complex_target_list.sql b/src/test/regress/sql/subquery_complex_target_list.sql index 1579ff017..932fb1b73 100644 --- a/src/test/regress/sql/subquery_complex_target_list.sql +++ b/src/test/regress/sql/subquery_complex_target_list.sql @@ -90,6 +90,12 @@ FROM -- we reset the client min_messages here to avoid adding an alternative output -- for pg14 as the output slightly differs. RESET client_min_messages; + +-- Set binary protocol temporarily to true always, to get consistent float +-- output across PG versions. +-- This is a no-op for PG_VERSION_NUM >= 14 +SET citus.enable_binary_protocol = TRUE; + -- Expressions inside the aggregates -- parts of the query is inspired by TPCH queries SELECT @@ -133,6 +139,9 @@ FROM ORDER BY 1 DESC; SET client_min_messages TO DEBUG1; +-- Reset binary protocol back after temporaribly changing it +-- This is a no-op for PG_VERSION_NUM >= 14 +RESET citus.enable_binary_protocol; -- Multiple columns in GROUP BYs -- foo needs to be recursively planned, bar can be pushded down diff --git a/src/test/regress/sql/worker_disable_binary_worker_copy_format.sql b/src/test/regress/sql/worker_disable_binary_worker_copy_format.sql new file mode 100644 index 000000000..0482c6d9d --- /dev/null +++ b/src/test/regress/sql/worker_disable_binary_worker_copy_format.sql @@ -0,0 +1,8 @@ +-- The files we use in the following text use the text based worker copy +-- format. So we disable the binary worker copy format here. +-- This is a no-op for PG_VERSION_NUM < 14, because the default is off there. +ALTER SYSTEM SET citus.binary_worker_copy_format TO off; +SELECT pg_reload_conf(); +SELECT success FROM run_command_on_workers('ALTER SYSTEM SET citus.binary_worker_copy_format TO off'); +SELECT success FROM run_command_on_workers('SELECT pg_reload_conf()'); + diff --git a/src/test/regress/worker_schedule b/src/test/regress/worker_schedule index e702be378..65cf479b7 100644 --- a/src/test/regress/worker_schedule +++ b/src/test/regress/worker_schedule @@ -14,6 +14,7 @@ test: worker_copy # ---------- # Range and hash re-partitioning related regression tests # ---------- +test: worker_disable_binary_worker_copy_format test: worker_range_partition worker_range_partition_complex test: worker_hash_partition worker_hash_partition_complex test: worker_merge_range_files worker_merge_hash_files