Merge pull request #5242 from citusdata/enable-binary-protocol-on-pg14

Since PG14 we can now use binary encoding for arrays and composite types
that contain user defined types. This was fixed in this commit in
Postgres: 670c0a1d47

This change starts using that knowledge, by not necessarily falling back
to text encoding anymore for those types.

While doing this and testing a bit more I found various cases where
binary encoding would fail that our checks didn't cover. This fixes
those cases and adds tests for those. It also fixes EXPLAIN ANALYZE 
never using binary encoding, which was a leftover of workaround that
was not necessary anymore.

Finally, it changes the default for both `citus.enable_binary_protocol`
and `citus.binary_worker_copy_format` to `true` for PG14 and up. In our
cloud offering `binary_worker_copy_format` already was true by default.
`enable_binary_protocol` had some bug with MX and user defined types, 
this bug was fixed by the above mentioned fixes.
pull/5251/head
Jelte Fennema 2021-09-06 10:44:08 +02:00 committed by GitHub
commit 5f46372416
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
27 changed files with 459 additions and 180 deletions

View File

@ -102,6 +102,7 @@
#include "libpq/pqformat.h"
#include "nodes/makefuncs.h"
#include "nodes/nodeFuncs.h"
#include "parser/parse_type.h"
#if PG_VERSION_NUM >= PG_VERSION_13
#include "tcop/cmdtag.h"
#endif
@ -998,9 +999,8 @@ CanUseBinaryCopyFormatForTargetList(List *targetEntryList)
/*
* CanUseBinaryCopyFormatForType determines whether it is safe to use the
* binary copy format for the given type. The binary copy format cannot
* be used for arrays or composite types that contain user-defined types,
* or when there is no binary output function defined.
* binary copy format for the given type. See contents of the function for
* details of when it's safe to use binary copy.
*/
bool
CanUseBinaryCopyFormatForType(Oid typeId)
@ -1015,14 +1015,79 @@ CanUseBinaryCopyFormatForType(Oid typeId)
return false;
}
if (typeId >= FirstNormalObjectId)
/*
* A row type can contain any types, possibly types that don't have
* the binary input and output functions defined.
*/
if (type_is_rowtype(typeId))
{
char typeCategory = '\0';
bool typePreferred = false;
/*
* TODO: Inspect the types inside the record and check if all of them
* can be binary encoded. If so, it's safe to use binary encoding.
*
* IMPORTANT: When implementing this todo keep the following in mind:
*
* In PG versions before PG14 the record_recv function would error out
* more than necessary.
*
* It errors out when any of the columns in the row have a type oid
* that doesn't match with the oid in the received data. This happens
* pretty much always for non built in types, because their oids differ
* between postgres intallations. So for those Postgres versions we
* would need a check like the following for each column:
*
* if (columnType >= FirstNormalObjectId) {
* return false
* }
*/
return false;
}
get_type_category_preferred(typeId, &typeCategory, &typePreferred);
if (typeCategory == TYPCATEGORY_ARRAY ||
typeCategory == TYPCATEGORY_COMPOSITE)
HeapTuple typeTup = typeidType(typeId);
Form_pg_type type = (Form_pg_type) GETSTRUCT(typeTup);
Oid elementType = type->typelem;
#if PG_VERSION_NUM < PG_VERSION_14
char typeCategory = type->typcategory;
#endif
ReleaseSysCache(typeTup);
#if PG_VERSION_NUM < PG_VERSION_14
/*
* In PG versions before PG14 the array_recv function would error out more
* than necessary.
*
* It errors out when the element type its oids don't match with the oid in
* the received data. This happens pretty much always for non built in
* types, because their oids differ between postgres intallations. So we
* skip binary encoding when the element type is a non built in type.
*/
if (typeCategory == TYPCATEGORY_ARRAY && elementType >= FirstNormalObjectId)
{
return false;
}
#endif
/*
* Any type that is a wrapper around an element type (e.g. arrays and
* ranges) require the element type to also has support for binary
* encoding.
*/
if (elementType != InvalidOid)
{
if (!CanUseBinaryCopyFormatForType(elementType))
{
return false;
}
}
/*
* For domains, make sure that the underlying type can be binary copied.
*/
Oid baseTypeId = getBaseType(typeId);
if (typeId != baseTypeId)
{
if (!CanUseBinaryCopyFormatForType(baseTypeId))
{
return false;
}

View File

@ -476,7 +476,11 @@ struct TaskPlacementExecution;
/* GUC, determining whether Citus opens 1 connection per task */
bool ForceMaxQueryParallelization = false;
int MaxAdaptiveExecutorPoolSize = 16;
#if PG_VERSION_NUM >= PG_VERSION_14
bool EnableBinaryProtocol = true;
#else
bool EnableBinaryProtocol = false;
#endif
/* GUC, number of ms to wait between opening connections to the same worker */
int ExecutorSlowStartInterval = 10;
@ -2023,13 +2027,7 @@ SetAttributeInputMetadata(DistributedExecution *execution,
{
attInMetadata = NULL;
}
/*
* We only allow binary results when queryCount is 1, because we
* cannot use binary results with SendRemoteCommand. Which must be
* used if queryCount is larger than 1.
*/
else if (EnableBinaryProtocol && queryCount == 1 &&
CanUseBinaryCopyFormat(tupleDescriptor))
else if (EnableBinaryProtocol && CanUseBinaryCopyFormat(tupleDescriptor))
{
attInMetadata = TupleDescGetAttBinaryInMetadata(tupleDescriptor);
shardCommandExecution->binaryResults = true;

View File

@ -575,7 +575,11 @@ RegisterCitusConfigVariables(void)
"in PostgreSQL's binary serialization format when "
"joining large tables."),
&BinaryWorkerCopyFormat,
#if PG_VERSION_NUM >= PG_VERSION_14
true,
#else
false,
#endif
PGC_SIGHUP,
GUC_STANDARD,
NULL, NULL, NULL);
@ -742,7 +746,11 @@ RegisterCitusConfigVariables(void)
"Enables communication between nodes using binary protocol when possible"),
NULL,
&EnableBinaryProtocol,
#if PG_VERSION_NUM >= PG_VERSION_14
true,
#else
false,
#endif
PGC_USERSET,
GUC_STANDARD,
NULL, NULL, NULL);

View File

@ -49,7 +49,11 @@
/* Config variables managed via guc.c */
#if PG_VERSION_NUM >= PG_VERSION_14
bool BinaryWorkerCopyFormat = true; /* binary format for copying between workers */
#else
bool BinaryWorkerCopyFormat = false; /* binary format for copying between workers */
#endif
int PartitionBufferSize = 16384; /* total partitioning buffer size in KB */
/* Local variables */

View File

@ -1,7 +1,7 @@
SET citus.shard_count = 2;
SET citus.next_shard_id TO 4754000;
CREATE SCHEMA binary_protocol;
SET search_path TO binary_protocol;
SET search_path TO binary_protocol, public;
SET citus.enable_binary_protocol = TRUE;
CREATE TABLE t(id int);
SELECT create_distributed_table('t', 'id');
@ -48,8 +48,6 @@ SELECT id, id, id, id, id,
10 | 10 | 10 | 10 | 10 | 10 | 10 | 10 | 10 | 10 | 10 | 10 | 10 | 10 | 10 | 10 | 10 | 10 | 10 | 10 | 10 | 10 | 10 | 10 | 10 | 10 | 10 | 10 | 10 | 10
(10 rows)
-- EXPLAIN ANALYZE is currently forced to use text protocol. Once that is
-- changed the numbers reported should change.
EXPLAIN (ANALYZE TRUE, TIMING FALSE, COSTS FALSE, SUMMARY FALSE) SELECT id FROM t ORDER BY 1;
QUERY PLAN
---------------------------------------------------------------------
@ -58,10 +56,10 @@ EXPLAIN (ANALYZE TRUE, TIMING FALSE, COSTS FALSE, SUMMARY FALSE) SELECT id FROM
Sort Method: quicksort Memory: 25kB
-> Custom Scan (Citus Adaptive) (actual rows=10 loops=1)
Task Count: 2
Tuple data received from nodes: 11 bytes
Tuple data received from nodes: 40 bytes
Tasks Shown: One of 2
-> Task
Tuple data received from node: 8 bytes
Tuple data received from node: 28 bytes
Node: host=localhost port=xxxxx dbname=regression
-> Seq Scan on t_4754000 t (actual rows=7 loops=1)
(11 rows)
@ -75,14 +73,14 @@ EXPLAIN (ANALYZE TRUE, TIMING FALSE, COSTS FALSE, SUMMARY FALSE) SELECT id FROM
Sort Method: quicksort Memory: 25kB
-> Custom Scan (Citus Adaptive) (actual rows=10 loops=1)
Task Count: 2
Tuple data received from nodes: 11 bytes
Tuple data received from nodes: 40 bytes
Tasks Shown: All
-> Task
Tuple data received from node: 8 bytes
Tuple data received from node: 28 bytes
Node: host=localhost port=xxxxx dbname=regression
-> Seq Scan on t_4754000 t (actual rows=7 loops=1)
-> Task
Tuple data received from node: 3 bytes
Tuple data received from node: 12 bytes
Node: host=localhost port=xxxxx dbname=regression
-> Seq Scan on t_4754001 t (actual rows=3 loops=1)
(15 rows)
@ -123,7 +121,7 @@ CREATE TYPE composite_type AS (
CREATE TABLE composite_type_table
(
id bigserial,
col composite_type[]
col composite_type
);
SELECT create_distributed_table('composite_type_table', 'id');
create_distributed_table
@ -131,33 +129,81 @@ SELECT create_distributed_table('composite_type_table', 'id');
(1 row)
INSERT INTO composite_type_table(col) VALUES (ARRAY[(1, 2)::composite_type]);
SELECT * FROM composite_type_table;
id | col
---------------------------------------------------------------------
1 | {"(1,2)"}
(1 row)
CREATE TYPE nested_composite_type AS (
a composite_type,
b composite_type
);
CREATE TABLE nested_composite_type_table
(
id bigserial,
col nested_composite_type
);
SELECT create_distributed_table('nested_composite_type_table', 'id');
create_distributed_table
select run_command_on_master_and_workers($$CREATE DOMAIN binary_protocol.composite_type_domain AS binary_protocol.composite_type$$);
run_command_on_master_and_workers
---------------------------------------------------------------------
(1 row)
INSERT INTO nested_composite_type_table(col) VALUES (((1, 2), (3,4))::nested_composite_type);
SELECT * FROM nested_composite_type_table;
id | col
select run_command_on_master_and_workers($$CREATE DOMAIN binary_protocol.nested_composite_type_domain AS binary_protocol.nested_composite_type$$);
run_command_on_master_and_workers
---------------------------------------------------------------------
1 | ("(1,2)","(3,4)")
(1 row)
INSERT INTO composite_type_table(col) VALUES ((1, 2)::composite_type);
SELECT col FROM composite_type_table;
col
---------------------------------------------------------------------
(1,2)
(1 row)
SELECT col::composite_type_domain FROM composite_type_table;
col
---------------------------------------------------------------------
(1,2)
(1 row)
SELECT (col, col) FROM composite_type_table;
row
---------------------------------------------------------------------
("(1,2)","(1,2)")
(1 row)
SELECT (col, col)::nested_composite_type FROM composite_type_table;
row
---------------------------------------------------------------------
("(1,2)","(1,2)")
(1 row)
SELECT (col, col)::nested_composite_type_domain FROM composite_type_table;
row
---------------------------------------------------------------------
("(1,2)","(1,2)")
(1 row)
SELECT ARRAY[col] FROM composite_type_table;
array
---------------------------------------------------------------------
{"(1,2)"}
(1 row)
SELECT ARRAY[col::composite_type_domain] FROM composite_type_table;
array
---------------------------------------------------------------------
{"(1,2)"}
(1 row)
SELECT ARRAY[(col, col)] FROM composite_type_table;
array
---------------------------------------------------------------------
{"(\"(1,2)\",\"(1,2)\")"}
(1 row)
SELECT ARRAY[(col, col)::nested_composite_type] FROM composite_type_table;
array
---------------------------------------------------------------------
{"(\"(1,2)\",\"(1,2)\")"}
(1 row)
SELECT ARRAY[(col, col)::nested_composite_type_domain] FROM composite_type_table;
array
---------------------------------------------------------------------
{"(\"(1,2)\",\"(1,2)\")"}
(1 row)
CREATE TABLE binaryless_builtin (
@ -170,11 +216,81 @@ SELECT create_reference_table('binaryless_builtin');
(1 row)
INSERT INTO binaryless_builtin VALUES ('user postgres=r/postgres', 'test');
SELECT * FROM binaryless_builtin;
col1 | col2
CREATE TYPE binaryless_composite_type AS (
a aclitem,
b aclitem
);
select run_command_on_master_and_workers($$CREATE DOMAIN binary_protocol.binaryless_domain AS aclitem$$);
run_command_on_master_and_workers
---------------------------------------------------------------------
postgres=r/postgres | test
(1 row)
select run_command_on_master_and_workers($$CREATE DOMAIN binary_protocol.binaryless_composite_domain AS binary_protocol.binaryless_composite_type$$);
run_command_on_master_and_workers
---------------------------------------------------------------------
(1 row)
INSERT INTO binaryless_builtin VALUES ('user postgres=r/postgres', 'test');
SELECT col1 FROM binaryless_builtin;
col1
---------------------------------------------------------------------
postgres=r/postgres
(1 row)
SELECT col1::binaryless_domain FROM binaryless_builtin;
col1
---------------------------------------------------------------------
postgres=r/postgres
(1 row)
SELECT (col1, col1) FROM binaryless_builtin;
row
---------------------------------------------------------------------
(postgres=r/postgres,postgres=r/postgres)
(1 row)
SELECT (col1, col1)::binaryless_composite_type FROM binaryless_builtin;
row
---------------------------------------------------------------------
(postgres=r/postgres,postgres=r/postgres)
(1 row)
SELECT (col1, col1)::binaryless_composite_domain FROM binaryless_builtin;
row
---------------------------------------------------------------------
(postgres=r/postgres,postgres=r/postgres)
(1 row)
SELECT ARRAY[col1] FROM binaryless_builtin;
array
---------------------------------------------------------------------
{postgres=r/postgres}
(1 row)
SELECT ARRAY[col1::binaryless_domain] FROM binaryless_builtin;
array
---------------------------------------------------------------------
{postgres=r/postgres}
(1 row)
SELECT ARRAY[(col1, col1)] FROM binaryless_builtin;
array
---------------------------------------------------------------------
{"(postgres=r/postgres,postgres=r/postgres)"}
(1 row)
SELECT ARRAY[(col1, col1)::binaryless_composite_type] FROM binaryless_builtin;
array
---------------------------------------------------------------------
{"(postgres=r/postgres,postgres=r/postgres)"}
(1 row)
SELECT ARRAY[(col1, col1)::binaryless_composite_domain] FROM binaryless_builtin;
array
---------------------------------------------------------------------
{"(postgres=r/postgres,postgres=r/postgres)"}
(1 row)
CREATE TABLE test_table_1(id int, val1 int);
@ -203,6 +319,5 @@ ORDER BY 1, 2;
3 | 3
(3 rows)
\set VERBOSITY terse
SET client_min_messages TO WARNING;
DROP SCHEMA binary_protocol CASCADE;
NOTICE: drop cascades to 8 other objects

View File

@ -383,7 +383,7 @@ SELECT broadcast_intermediate_result('stored_squares_1',
-- query the intermediate result in a router query using text format
SELECT * FROM interesting_squares JOIN (
SELECT * FROM
read_intermediate_results(ARRAY['stored_squares_1', 'stored_squares_2'], 'binary') AS res (x int, x2 int, z intermediate_results.square_type)
read_intermediate_results(ARRAY['stored_squares_1', 'stored_squares_2'], 'text') AS res (x int, x2 int, z intermediate_results.square_type)
) squares
ON (squares.x::text = interested_in) WHERE user_id = 'jon' ORDER BY 1,2;
user_id | interested_in | x | x2 | z

View File

@ -1,4 +1,8 @@
SET citus.enable_repartition_joins to ON;
-- The intermediate result limits chosen below are based on text sizes of the
-- intermediate results. This is a no-op for PG_VERSION_NUM < 14, because the
-- default is false there.
SET citus.enable_binary_protocol = FALSE;
SET citus.max_intermediate_result_size TO 2;
-- should fail because the copy size is ~4kB for each cte
WITH cte AS MATERIALIZED

View File

@ -287,6 +287,8 @@ NOTICE: executing the command locally: INSERT INTO local_shard_execution.distri
INSERT INTO distributed_table SELECT 1, '1',15 FROM distributed_table WHERE key = 2 LIMIT 1 ON CONFLICT DO NOTHING;
-- sanity check: multi-shard INSERT..SELECT pushdown goes through distributed execution
INSERT INTO distributed_table SELECT * FROM distributed_table ON CONFLICT DO NOTHING;
-- Ensure tuple data in explain analyze output is the same on all PG versions
SET citus.enable_binary_protocol = TRUE;
-- EXPLAIN for local execution just works fine
-- though going through distributed execution
EXPLAIN (COSTS OFF) SELECT * FROM distributed_table WHERE key = 1 AND age = 20;
@ -307,10 +309,10 @@ EXPLAIN (ANALYZE, COSTS OFF, SUMMARY OFF, TIMING OFF) SELECT * FROM distribute
---------------------------------------------------------------------
Custom Scan (Citus Adaptive) (actual rows=1 loops=1)
Task Count: 1
Tuple data received from nodes: 5 bytes
Tuple data received from nodes: 14 bytes
Tasks Shown: All
-> Task
Tuple data received from node: 5 bytes
Tuple data received from node: 14 bytes
Node: host=localhost port=xxxxx dbname=regression
-> Index Scan using distributed_table_pkey_1470001 on distributed_table_1470001 distributed_table (actual rows=1 loops=1)
Index Cond: (key = 1)
@ -328,17 +330,17 @@ SELECT 1 FROM r WHERE z < 3;
Result destination: Write locally
-> Custom Scan (Citus Adaptive) (actual rows=1 loops=1)
Task Count: 4
Tuple data received from nodes: 6 bytes
Tuple data received from nodes: 22 bytes
Tasks Shown: One of 4
-> Task
Tuple data received from node: 6 bytes
Tuple data received from node: 22 bytes
Node: host=localhost port=xxxxx dbname=regression
-> Seq Scan on distributed_table_1470001 distributed_table (actual rows=1 loops=1)
Task Count: 1
Tuple data received from nodes: 1 bytes
Tuple data received from nodes: 4 bytes
Tasks Shown: All
-> Task
Tuple data received from node: 1 bytes
Tuple data received from node: 4 bytes
Node: host=localhost port=xxxxx dbname=regression
-> Function Scan on read_intermediate_result intermediate_result (actual rows=1 loops=1)
Filter: (z < '3'::double precision)

View File

@ -5,6 +5,8 @@ SET citus.next_shard_id TO 570000;
\a\t
SET citus.explain_distributed_queries TO on;
SET citus.enable_repartition_joins to ON;
-- Ensure tuple data in explain analyze output is the same on all PG versions
SET citus.enable_binary_protocol = TRUE;
-- Function that parses explain output as JSON
CREATE FUNCTION explain_json(query text)
RETURNS jsonb
@ -274,10 +276,10 @@ Sort (actual rows=50 loops=1)
Group Key: remote_scan.l_quantity
-> Custom Scan (Citus Adaptive) (actual rows=100 loops=1)
Task Count: 2
Tuple data received from nodes: 780 bytes
Tuple data received from nodes: 1800 bytes
Tasks Shown: One of 2
-> Task
Tuple data received from node: 390 bytes
Tuple data received from node: 900 bytes
Node: host=localhost port=xxxxx dbname=regression
-> HashAggregate (actual rows=50 loops=1)
Group Key: l_quantity
@ -294,7 +296,7 @@ EXPLAIN (COSTS off, ANALYZE on, TIMING off, SUMMARY off) SELECT count(*) FROM t1
Aggregate (actual rows=1 loops=1)
-> Custom Scan (Citus Adaptive) (actual rows=4 loops=1)
Task Count: 4
Tuple data received from nodes: 4 bytes
Tuple data received from nodes: 32 bytes
Tasks Shown: None, not supported for re-partition queries
-> MapMergeJob
Map Task Count: 3
@ -313,7 +315,7 @@ Custom Scan (Citus Adaptive) (actual rows=1 loops=1)
-> Aggregate (actual rows=1 loops=1)
-> Custom Scan (Citus Adaptive) (actual rows=4 loops=1)
Task Count: 4
Tuple data received from nodes: 4 bytes
Tuple data received from nodes: 32 bytes
Tasks Shown: None, not supported for re-partition queries
-> MapMergeJob
Map Task Count: 3
@ -322,10 +324,10 @@ Custom Scan (Citus Adaptive) (actual rows=1 loops=1)
Map Task Count: 3
Merge Task Count: 4
Task Count: 1
Tuple data received from nodes: 1 bytes
Tuple data received from nodes: 8 bytes
Tasks Shown: All
-> Task
Tuple data received from node: 1 bytes
Tuple data received from node: 8 bytes
Node: host=localhost port=xxxxx dbname=regression
-> Aggregate (actual rows=1 loops=1)
-> Function Scan on read_intermediate_result intermediate_result (actual rows=1 loops=1)
@ -345,11 +347,11 @@ Sort (actual rows=50 loops=1)
-> Custom Scan (Citus Adaptive) (actual rows=100 loops=1)
Output: remote_scan.l_quantity, remote_scan.count_quantity
Task Count: 2
Tuple data received from nodes: 780 bytes
Tuple data received from nodes: 1800 bytes
Tasks Shown: One of 2
-> Task
Query: SELECT l_quantity, count(*) AS count_quantity FROM public.lineitem_290000 lineitem WHERE true GROUP BY l_quantity
Tuple data received from node: 390 bytes
Tuple data received from node: 900 bytes
Node: host=localhost port=xxxxx dbname=regression
-> HashAggregate (actual rows=50 loops=1)
Output: l_quantity, count(*)
@ -991,16 +993,16 @@ Sort (actual rows=50 loops=1)
Group Key: remote_scan.l_quantity
-> Custom Scan (Citus Adaptive) (actual rows=100 loops=1)
Task Count: 2
Tuple data received from nodes: 780 bytes
Tuple data received from nodes: 1800 bytes
Tasks Shown: All
-> Task
Tuple data received from node: 390 bytes
Tuple data received from node: 900 bytes
Node: host=localhost port=xxxxx dbname=regression
-> HashAggregate (actual rows=50 loops=1)
Group Key: l_quantity
-> Seq Scan on lineitem_290000 lineitem (actual rows=6000 loops=1)
-> Task
Tuple data received from node: 390 bytes
Tuple data received from node: 900 bytes
Node: host=localhost port=xxxxx dbname=regression
-> HashAggregate (actual rows=50 loops=1)
Group Key: l_quantity
@ -1262,10 +1264,10 @@ Custom Scan (Citus Adaptive) (cost=0.00..0.00 rows=100000 width=18)
EXPLAIN (ANALYZE ON, COSTS OFF, TIMING OFF, SUMMARY OFF) EXECUTE router_executor_query_param(5);
Custom Scan (Citus Adaptive) (actual rows=3 loops=1)
Task Count: 1
Tuple data received from nodes: 15 bytes
Tuple data received from nodes: 30 bytes
Tasks Shown: All
-> Task
Tuple data received from node: 15 bytes
Tuple data received from node: 30 bytes
Node: host=localhost port=xxxxx dbname=regression
-> Index Scan using lineitem_pkey_290000 on lineitem_290000 lineitem (actual rows=3 loops=1)
Index Cond: (l_orderkey = 5)
@ -1867,10 +1869,10 @@ SELECT create_distributed_table('explain_analyze_test', 'a');
EXPLAIN :default_analyze_flags SELECT * FROM explain_analyze_test WHERE a = 1;
Custom Scan (Citus Adaptive) (actual rows=1 loops=1)
Task Count: 1
Tuple data received from nodes: 8 bytes
Tuple data received from nodes: 11 bytes
Tasks Shown: All
-> Task
Tuple data received from node: 8 bytes
Tuple data received from node: 11 bytes
Node: host=localhost port=xxxxx dbname=regression
-> Seq Scan on explain_analyze_test_570009 explain_analyze_test (actual rows=1 loops=1)
Filter: (a = 1)
@ -1879,10 +1881,10 @@ EXPLAIN :default_analyze_flags SELECT count(*) FROM explain_analyze_test;
Aggregate (actual rows=1 loops=1)
-> Custom Scan (Citus Adaptive) (actual rows=4 loops=1)
Task Count: 4
Tuple data received from nodes: 4 bytes
Tuple data received from nodes: 32 bytes
Tasks Shown: One of 4
-> Task
Tuple data received from node: 1 bytes
Tuple data received from node: 8 bytes
Node: host=localhost port=xxxxx dbname=regression
-> Aggregate (actual rows=1 loops=1)
-> Seq Scan on explain_analyze_test_570009 explain_analyze_test (actual rows=1 loops=1)
@ -2248,17 +2250,17 @@ Custom Scan (Citus Adaptive) (actual rows=1 loops=1)
Result destination: Send to 2 nodes
-> Custom Scan (Citus Adaptive) (actual rows=10 loops=1)
Task Count: 4
Tuple data received from nodes: 21 bytes
Tuple data received from nodes: 120 bytes
Tasks Shown: One of 4
-> Task
Tuple data received from node: 9 bytes
Tuple data received from node: 48 bytes
Node: host=localhost port=xxxxx dbname=regression
-> Seq Scan on dist_table_570017 dist_table (actual rows=4 loops=1)
Task Count: 1
Tuple data received from nodes: 2 bytes
Tuple data received from nodes: 8 bytes
Tasks Shown: All
-> Task
Tuple data received from node: 2 bytes
Tuple data received from node: 8 bytes
Node: host=localhost port=xxxxx dbname=regression
-> Aggregate (actual rows=1 loops=1)
-> Hash Join (actual rows=10 loops=1)
@ -2271,10 +2273,10 @@ SELECT count(distinct a) FROM (SELECT GREATEST(random(), 2) r, a FROM dist_table
Aggregate (actual rows=1 loops=1)
-> Custom Scan (Citus Adaptive) (actual rows=4 loops=1)
Task Count: 4
Tuple data received from nodes: 4 bytes
Tuple data received from nodes: 32 bytes
Tasks Shown: One of 4
-> Task
Tuple data received from node: 1 bytes
Tuple data received from node: 8 bytes
Node: host=localhost port=xxxxx dbname=regression
-> Aggregate (actual rows=1 loops=1)
-> Merge Join (actual rows=4 loops=1)
@ -2314,10 +2316,10 @@ Aggregate (actual rows=1 loops=1)
Sort Method: quicksort Memory: 25kB
-> Seq Scan on ref_table_570021 ref_table (actual rows=10 loops=1)
Task Count: 4
Tuple data received from nodes: 4 bytes
Tuple data received from nodes: 32 bytes
Tasks Shown: One of 4
-> Task
Tuple data received from node: 1 bytes
Tuple data received from node: 8 bytes
Node: host=localhost port=xxxxx dbname=regression
-> Aggregate (actual rows=1 loops=1)
InitPlan 1 (returns $0)
@ -2340,10 +2342,10 @@ Custom Scan (Citus Adaptive) (actual rows=1 loops=1)
Result destination: Write locally
-> Custom Scan (Citus Adaptive) (actual rows=20 loops=1)
Task Count: 4
Tuple data received from nodes: 44 bytes
Tuple data received from nodes: 160 bytes
Tasks Shown: One of 4
-> Task
Tuple data received from node: 20 bytes
Tuple data received from node: 64 bytes
Node: host=localhost port=xxxxx dbname=regression
-> Insert on dist_table_570017 citus_table_alias (actual rows=8 loops=1)
-> Seq Scan on dist_table_570017 dist_table (actual rows=8 loops=1)
@ -2353,17 +2355,17 @@ Custom Scan (Citus Adaptive) (actual rows=1 loops=1)
Result destination: Write locally
-> Custom Scan (Citus Adaptive) (actual rows=10 loops=1)
Task Count: 1
Tuple data received from nodes: 28 bytes
Tuple data received from nodes: 50 bytes
Tasks Shown: All
-> Task
Tuple data received from node: 28 bytes
Tuple data received from node: 50 bytes
Node: host=localhost port=xxxxx dbname=regression
-> Function Scan on read_intermediate_result intermediate_result (actual rows=10 loops=1)
Task Count: 1
Tuple data received from nodes: 2 bytes
Tuple data received from nodes: 8 bytes
Tasks Shown: All
-> Task
Tuple data received from node: 2 bytes
Tuple data received from node: 8 bytes
Node: host=localhost port=xxxxx dbname=regression
-> Aggregate (actual rows=1 loops=1)
-> Function Scan on read_intermediate_result intermediate_result (actual rows=10 loops=1)
@ -2373,10 +2375,10 @@ prepare ref_select(int) AS select * from ref_table where 1 = $1;
explain :default_analyze_flags execute ref_select(1);
Custom Scan (Citus Adaptive) (actual rows=10 loops=1)
Task Count: 1
Tuple data received from nodes: 11 bytes
Tuple data received from nodes: 40 bytes
Tasks Shown: All
-> Task
Tuple data received from node: 11 bytes
Tuple data received from node: 40 bytes
Node: host=localhost port=xxxxx dbname=regression
-> Result (actual rows=10 loops=1)
One-Time Filter: (1 = $1)
@ -2396,104 +2398,104 @@ SELECT create_distributed_table('dist_table_rep2', 'a');
EXPLAIN :default_analyze_flags INSERT INTO dist_table_rep1 VALUES(1), (2), (3), (4), (10), (100) RETURNING *;
Custom Scan (Citus Adaptive) (actual rows=6 loops=1)
Task Count: 2
Tuple data received from nodes: 9 bytes
Tuple data received from nodes: 24 bytes
Tasks Shown: One of 2
-> Task
Tuple data received from node: 5 bytes
Tuple data received from node: 16 bytes
Node: host=localhost port=xxxxx dbname=regression
-> Insert on dist_table_rep1_570022 citus_table_alias (actual rows=4 loops=1)
-> Values Scan on "*VALUES*" (actual rows=4 loops=1)
EXPLAIN :default_analyze_flags SELECT * from dist_table_rep1;
Custom Scan (Citus Adaptive) (actual rows=6 loops=1)
Task Count: 2
Tuple data received from nodes: 9 bytes
Tuple data received from nodes: 24 bytes
Tasks Shown: One of 2
-> Task
Tuple data received from node: 5 bytes
Tuple data received from node: 16 bytes
Node: host=localhost port=xxxxx dbname=regression
-> Seq Scan on dist_table_rep1_570022 dist_table_rep1 (actual rows=4 loops=1)
EXPLAIN :default_analyze_flags INSERT INTO dist_table_rep2 VALUES(1), (2), (3), (4), (10), (100) RETURNING *;
Custom Scan (Citus Adaptive) (actual rows=6 loops=1)
Task Count: 2
Tuple data received from nodes: 18 bytes
Tuple data received from nodes: 48 bytes
Tasks Shown: One of 2
-> Task
Tuple data received from node: 10 bytes
Tuple data received from node: 32 bytes
Node: host=localhost port=xxxxx dbname=regression
-> Insert on dist_table_rep2_570024 citus_table_alias (actual rows=4 loops=1)
-> Values Scan on "*VALUES*" (actual rows=4 loops=1)
EXPLAIN :default_analyze_flags SELECT * from dist_table_rep2;
Custom Scan (Citus Adaptive) (actual rows=6 loops=1)
Task Count: 2
Tuple data received from nodes: 9 bytes
Tuple data received from nodes: 24 bytes
Tasks Shown: One of 2
-> Task
Tuple data received from node: 5 bytes
Tuple data received from node: 16 bytes
Node: host=localhost port=xxxxx dbname=regression
-> Seq Scan on dist_table_rep2_570024 dist_table_rep2 (actual rows=4 loops=1)
prepare p1 as SELECT * FROM dist_table_rep1;
EXPLAIN :default_analyze_flags EXECUTE p1;
Custom Scan (Citus Adaptive) (actual rows=6 loops=1)
Task Count: 2
Tuple data received from nodes: 9 bytes
Tuple data received from nodes: 24 bytes
Tasks Shown: One of 2
-> Task
Tuple data received from node: 5 bytes
Tuple data received from node: 16 bytes
Node: host=localhost port=xxxxx dbname=regression
-> Seq Scan on dist_table_rep1_570022 dist_table_rep1 (actual rows=4 loops=1)
EXPLAIN :default_analyze_flags EXECUTE p1;
Custom Scan (Citus Adaptive) (actual rows=6 loops=1)
Task Count: 2
Tuple data received from nodes: 9 bytes
Tuple data received from nodes: 24 bytes
Tasks Shown: One of 2
-> Task
Tuple data received from node: 5 bytes
Tuple data received from node: 16 bytes
Node: host=localhost port=xxxxx dbname=regression
-> Seq Scan on dist_table_rep1_570022 dist_table_rep1 (actual rows=4 loops=1)
EXPLAIN :default_analyze_flags EXECUTE p1;
Custom Scan (Citus Adaptive) (actual rows=6 loops=1)
Task Count: 2
Tuple data received from nodes: 9 bytes
Tuple data received from nodes: 24 bytes
Tasks Shown: One of 2
-> Task
Tuple data received from node: 5 bytes
Tuple data received from node: 16 bytes
Node: host=localhost port=xxxxx dbname=regression
-> Seq Scan on dist_table_rep1_570022 dist_table_rep1 (actual rows=4 loops=1)
EXPLAIN :default_analyze_flags EXECUTE p1;
Custom Scan (Citus Adaptive) (actual rows=6 loops=1)
Task Count: 2
Tuple data received from nodes: 9 bytes
Tuple data received from nodes: 24 bytes
Tasks Shown: One of 2
-> Task
Tuple data received from node: 5 bytes
Tuple data received from node: 16 bytes
Node: host=localhost port=xxxxx dbname=regression
-> Seq Scan on dist_table_rep1_570022 dist_table_rep1 (actual rows=4 loops=1)
EXPLAIN :default_analyze_flags EXECUTE p1;
Custom Scan (Citus Adaptive) (actual rows=6 loops=1)
Task Count: 2
Tuple data received from nodes: 9 bytes
Tuple data received from nodes: 24 bytes
Tasks Shown: One of 2
-> Task
Tuple data received from node: 5 bytes
Tuple data received from node: 16 bytes
Node: host=localhost port=xxxxx dbname=regression
-> Seq Scan on dist_table_rep1_570022 dist_table_rep1 (actual rows=4 loops=1)
EXPLAIN :default_analyze_flags EXECUTE p1;
Custom Scan (Citus Adaptive) (actual rows=6 loops=1)
Task Count: 2
Tuple data received from nodes: 9 bytes
Tuple data received from nodes: 24 bytes
Tasks Shown: One of 2
-> Task
Tuple data received from node: 5 bytes
Tuple data received from node: 16 bytes
Node: host=localhost port=xxxxx dbname=regression
-> Seq Scan on dist_table_rep1_570022 dist_table_rep1 (actual rows=4 loops=1)
prepare p2 AS SELECT * FROM dist_table_rep1 WHERE a = $1;
EXPLAIN :default_analyze_flags EXECUTE p2(1);
Custom Scan (Citus Adaptive) (actual rows=1 loops=1)
Task Count: 1
Tuple data received from nodes: 1 bytes
Tuple data received from nodes: 4 bytes
Tasks Shown: All
-> Task
Tuple data received from node: 1 bytes
Tuple data received from node: 4 bytes
Node: host=localhost port=xxxxx dbname=regression
-> Seq Scan on dist_table_rep1_570022 dist_table_rep1 (actual rows=1 loops=1)
Filter: (a = 1)
@ -2501,10 +2503,10 @@ Custom Scan (Citus Adaptive) (actual rows=1 loops=1)
EXPLAIN :default_analyze_flags EXECUTE p2(1);
Custom Scan (Citus Adaptive) (actual rows=1 loops=1)
Task Count: 1
Tuple data received from nodes: 1 bytes
Tuple data received from nodes: 4 bytes
Tasks Shown: All
-> Task
Tuple data received from node: 1 bytes
Tuple data received from node: 4 bytes
Node: host=localhost port=xxxxx dbname=regression
-> Seq Scan on dist_table_rep1_570022 dist_table_rep1 (actual rows=1 loops=1)
Filter: (a = 1)
@ -2512,10 +2514,10 @@ Custom Scan (Citus Adaptive) (actual rows=1 loops=1)
EXPLAIN :default_analyze_flags EXECUTE p2(1);
Custom Scan (Citus Adaptive) (actual rows=1 loops=1)
Task Count: 1
Tuple data received from nodes: 1 bytes
Tuple data received from nodes: 4 bytes
Tasks Shown: All
-> Task
Tuple data received from node: 1 bytes
Tuple data received from node: 4 bytes
Node: host=localhost port=xxxxx dbname=regression
-> Seq Scan on dist_table_rep1_570022 dist_table_rep1 (actual rows=1 loops=1)
Filter: (a = 1)
@ -2523,10 +2525,10 @@ Custom Scan (Citus Adaptive) (actual rows=1 loops=1)
EXPLAIN :default_analyze_flags EXECUTE p2(1);
Custom Scan (Citus Adaptive) (actual rows=1 loops=1)
Task Count: 1
Tuple data received from nodes: 1 bytes
Tuple data received from nodes: 4 bytes
Tasks Shown: All
-> Task
Tuple data received from node: 1 bytes
Tuple data received from node: 4 bytes
Node: host=localhost port=xxxxx dbname=regression
-> Seq Scan on dist_table_rep1_570022 dist_table_rep1 (actual rows=1 loops=1)
Filter: (a = 1)
@ -2534,10 +2536,10 @@ Custom Scan (Citus Adaptive) (actual rows=1 loops=1)
EXPLAIN :default_analyze_flags EXECUTE p2(1);
Custom Scan (Citus Adaptive) (actual rows=1 loops=1)
Task Count: 1
Tuple data received from nodes: 1 bytes
Tuple data received from nodes: 4 bytes
Tasks Shown: All
-> Task
Tuple data received from node: 1 bytes
Tuple data received from node: 4 bytes
Node: host=localhost port=xxxxx dbname=regression
-> Seq Scan on dist_table_rep1_570022 dist_table_rep1 (actual rows=1 loops=1)
Filter: (a = 1)
@ -2545,10 +2547,10 @@ Custom Scan (Citus Adaptive) (actual rows=1 loops=1)
EXPLAIN :default_analyze_flags EXECUTE p2(1);
Custom Scan (Citus Adaptive) (actual rows=1 loops=1)
Task Count: 1
Tuple data received from nodes: 1 bytes
Tuple data received from nodes: 4 bytes
Tasks Shown: All
-> Task
Tuple data received from node: 1 bytes
Tuple data received from node: 4 bytes
Node: host=localhost port=xxxxx dbname=regression
-> Seq Scan on dist_table_rep1_570022 dist_table_rep1 (actual rows=1 loops=1)
Filter: (a = 1)
@ -2556,10 +2558,10 @@ Custom Scan (Citus Adaptive) (actual rows=1 loops=1)
EXPLAIN :default_analyze_flags EXECUTE p2(10);
Custom Scan (Citus Adaptive) (actual rows=1 loops=1)
Task Count: 1
Tuple data received from nodes: 2 bytes
Tuple data received from nodes: 4 bytes
Tasks Shown: All
-> Task
Tuple data received from node: 2 bytes
Tuple data received from node: 4 bytes
Node: host=localhost port=xxxxx dbname=regression
-> Seq Scan on dist_table_rep1_570022 dist_table_rep1 (actual rows=1 loops=1)
Filter: (a = 10)
@ -2567,10 +2569,10 @@ Custom Scan (Citus Adaptive) (actual rows=1 loops=1)
EXPLAIN :default_analyze_flags EXECUTE p2(100);
Custom Scan (Citus Adaptive) (actual rows=1 loops=1)
Task Count: 1
Tuple data received from nodes: 3 bytes
Tuple data received from nodes: 4 bytes
Tasks Shown: All
-> Task
Tuple data received from node: 3 bytes
Tuple data received from node: 4 bytes
Node: host=localhost port=xxxxx dbname=regression
-> Seq Scan on dist_table_rep1_570023 dist_table_rep1 (actual rows=1 loops=1)
Filter: (a = 100)
@ -2579,10 +2581,10 @@ prepare p3 AS SELECT * FROM dist_table_rep1 WHERE a = 1;
EXPLAIN :default_analyze_flags EXECUTE p3;
Custom Scan (Citus Adaptive) (actual rows=1 loops=1)
Task Count: 1
Tuple data received from nodes: 1 bytes
Tuple data received from nodes: 4 bytes
Tasks Shown: All
-> Task
Tuple data received from node: 1 bytes
Tuple data received from node: 4 bytes
Node: host=localhost port=xxxxx dbname=regression
-> Seq Scan on dist_table_rep1_570022 dist_table_rep1 (actual rows=1 loops=1)
Filter: (a = 1)
@ -2590,10 +2592,10 @@ Custom Scan (Citus Adaptive) (actual rows=1 loops=1)
EXPLAIN :default_analyze_flags EXECUTE p3;
Custom Scan (Citus Adaptive) (actual rows=1 loops=1)
Task Count: 1
Tuple data received from nodes: 1 bytes
Tuple data received from nodes: 4 bytes
Tasks Shown: All
-> Task
Tuple data received from node: 1 bytes
Tuple data received from node: 4 bytes
Node: host=localhost port=xxxxx dbname=regression
-> Seq Scan on dist_table_rep1_570022 dist_table_rep1 (actual rows=1 loops=1)
Filter: (a = 1)
@ -2601,10 +2603,10 @@ Custom Scan (Citus Adaptive) (actual rows=1 loops=1)
EXPLAIN :default_analyze_flags EXECUTE p3;
Custom Scan (Citus Adaptive) (actual rows=1 loops=1)
Task Count: 1
Tuple data received from nodes: 1 bytes
Tuple data received from nodes: 4 bytes
Tasks Shown: All
-> Task
Tuple data received from node: 1 bytes
Tuple data received from node: 4 bytes
Node: host=localhost port=xxxxx dbname=regression
-> Seq Scan on dist_table_rep1_570022 dist_table_rep1 (actual rows=1 loops=1)
Filter: (a = 1)
@ -2612,10 +2614,10 @@ Custom Scan (Citus Adaptive) (actual rows=1 loops=1)
EXPLAIN :default_analyze_flags EXECUTE p3;
Custom Scan (Citus Adaptive) (actual rows=1 loops=1)
Task Count: 1
Tuple data received from nodes: 1 bytes
Tuple data received from nodes: 4 bytes
Tasks Shown: All
-> Task
Tuple data received from node: 1 bytes
Tuple data received from node: 4 bytes
Node: host=localhost port=xxxxx dbname=regression
-> Seq Scan on dist_table_rep1_570022 dist_table_rep1 (actual rows=1 loops=1)
Filter: (a = 1)
@ -2623,10 +2625,10 @@ Custom Scan (Citus Adaptive) (actual rows=1 loops=1)
EXPLAIN :default_analyze_flags EXECUTE p3;
Custom Scan (Citus Adaptive) (actual rows=1 loops=1)
Task Count: 1
Tuple data received from nodes: 1 bytes
Tuple data received from nodes: 4 bytes
Tasks Shown: All
-> Task
Tuple data received from node: 1 bytes
Tuple data received from node: 4 bytes
Node: host=localhost port=xxxxx dbname=regression
-> Seq Scan on dist_table_rep1_570022 dist_table_rep1 (actual rows=1 loops=1)
Filter: (a = 1)
@ -2634,10 +2636,10 @@ Custom Scan (Citus Adaptive) (actual rows=1 loops=1)
EXPLAIN :default_analyze_flags EXECUTE p3;
Custom Scan (Citus Adaptive) (actual rows=1 loops=1)
Task Count: 1
Tuple data received from nodes: 1 bytes
Tuple data received from nodes: 4 bytes
Tasks Shown: All
-> Task
Tuple data received from node: 1 bytes
Tuple data received from node: 4 bytes
Node: host=localhost port=xxxxx dbname=regression
-> Seq Scan on dist_table_rep1_570022 dist_table_rep1 (actual rows=1 loops=1)
Filter: (a = 1)
@ -2860,10 +2862,10 @@ Custom Scan (Citus Adaptive) (actual rows=10 loops=1)
Filter: (l_quantity < '-1'::numeric)
Rows Removed by Filter: 2885
Task Count: 1
Tuple data received from nodes: 11 bytes
Tuple data received from nodes: 40 bytes
Tasks Shown: All
-> Task
Tuple data received from node: 11 bytes
Tuple data received from node: 40 bytes
Node: host=localhost port=xxxxx dbname=regression
-> Function Scan on generate_series s (actual rows=10 loops=1)
ROLLBACK;
@ -2920,7 +2922,7 @@ set citus.explain_analyze_sort_method to "taskId";
EXPLAIN (COSTS FALSE, ANALYZE TRUE, TIMING FALSE, SUMMARY FALSE) select a, CASE WHEN pg_sleep(0.4) IS NULL THEN 'x' END from explain_analyze_execution_time;
Custom Scan (Citus Adaptive) (actual rows=1 loops=1)
Task Count: 2
Tuple data received from nodes: 1 bytes
Tuple data received from nodes: 4 bytes
Tasks Shown: One of 2
-> Task
Tuple data received from node: 0 bytes
@ -2930,10 +2932,10 @@ set citus.explain_analyze_sort_method to "execution-time";
EXPLAIN (COSTS FALSE, ANALYZE TRUE, TIMING FALSE, SUMMARY FALSE) select a, CASE WHEN pg_sleep(0.4) IS NULL THEN 'x' END from explain_analyze_execution_time;
Custom Scan (Citus Adaptive) (actual rows=1 loops=1)
Task Count: 2
Tuple data received from nodes: 1 bytes
Tuple data received from nodes: 4 bytes
Tasks Shown: One of 2
-> Task
Tuple data received from node: 1 bytes
Tuple data received from node: 4 bytes
Node: host=localhost port=xxxxx dbname=regression
-> Seq Scan on explain_analyze_execution_time_570030 explain_analyze_execution_time (actual rows=1 loops=1)
-- reset back
@ -2987,10 +2989,10 @@ Limit (actual rows=1 loops=1)
Node: host=localhost port=xxxxx dbname=regression
-> Seq Scan on distributed_table_1_570032 distributed_table_xxx (actual rows=1 loops=1)
Task Count: 2
Tuple data received from nodes: 3 bytes
Tuple data received from nodes: 16 bytes
Tasks Shown: One of 2
-> Task
Tuple data received from node: 3 bytes
Tuple data received from node: 16 bytes
Node: host=localhost port=xxxxx dbname=regression
-> Limit (actual rows=1 loops=1)
-> Nested Loop (actual rows=1 loops=1)

View File

@ -230,8 +230,6 @@ DEBUG: pushing down the function call
(S,S)
(1 row)
-- this is fixed with pg14 and this will fail prior to
-- pg 14
SET citus.enable_binary_protocol = TRUE;
select mx_call_func_custom_types('S', 'A');
DEBUG: pushing down the function call

View File

@ -230,15 +230,21 @@ DEBUG: pushing down the function call
(S,S)
(1 row)
-- this is fixed with pg14 and this will fail prior to
-- pg 14
SET citus.enable_binary_protocol = TRUE;
select mx_call_func_custom_types('S', 'A');
DEBUG: pushing down the function call
ERROR: wrong data type: XXXX, expected XXXX
mx_call_func_custom_types
---------------------------------------------------------------------
(S,S)
(1 row)
select multi_mx_function_call_delegation.mx_call_func_custom_types('S', 'A');
DEBUG: pushing down the function call
ERROR: wrong data type: XXXX, expected XXXX
mx_call_func_custom_types
---------------------------------------------------------------------
(S,S)
(1 row)
RESET citus.enable_binary_protocol;
-- We don't allow distributing calls inside transactions
begin;

View File

@ -10,6 +10,8 @@ set search_path to test_pg13;
SET citus.shard_replication_factor to 1;
SET citus.shard_count to 2;
SET citus.next_shard_id TO 65000;
-- Ensure tuple data in explain analyze output is the same on all PG versions
SET citus.enable_binary_protocol = TRUE;
CREATE TABLE dist_table (name char, age int);
CREATE INDEX name_index on dist_table(name);
SELECT create_distributed_table('dist_table', 'name');
@ -217,10 +219,10 @@ INSERT INTO test_wal VALUES(3,33),(4,44),(5,55) RETURNING *;
---------------------------------------------------------------------
Custom Scan (Citus Adaptive) (actual rows=3 loops=1)
Task Count: 1
Tuple data received from nodes: 9 bytes
Tuple data received from nodes: 24 bytes
Tasks Shown: All
-> Task
Tuple data received from node: 9 bytes
Tuple data received from node: 24 bytes
Node: host=localhost port=xxxxx dbname=regression
-> Insert on test_wal_65012 citus_table_alias (actual rows=3 loops=1)
WAL: records=3 bytes=189

View File

@ -3,6 +3,8 @@ SET search_path TO single_node;
SET citus.shard_count TO 4;
SET citus.shard_replication_factor TO 1;
SET citus.next_shard_id TO 90630500;
-- Ensure tuple data in explain analyze output is the same on all PG versions
SET citus.enable_binary_protocol = TRUE;
-- adding the coordinator as inactive is disallowed
SELECT 1 FROM master_add_inactive_node('localhost', :master_port, groupid => 0);
ERROR: coordinator node cannot be added as inactive node
@ -580,10 +582,10 @@ EXPLAIN (COSTS FALSE, ANALYZE TRUE, TIMING FALSE, SUMMARY FALSE)
---------------------------------------------------------------------
Custom Scan (Citus Adaptive) (actual rows=5 loops=1)
Task Count: 4
Tuple data received from nodes: 10 bytes
Tuple data received from nodes: 40 bytes
Tasks Shown: One of 4
-> Task
Tuple data received from node: 4 bytes
Tuple data received from node: 16 bytes
Node: host=localhost port=xxxxx dbname=regression
-> Seq Scan on test_90630506 test (actual rows=2 loops=1)
(8 rows)

View File

@ -129,6 +129,10 @@ DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT foo."?column?
-- we reset the client min_messages here to avoid adding an alternative output
-- for pg14 as the output slightly differs.
RESET client_min_messages;
-- Set binary protocol temporarily to true always, to get consistent float
-- output across PG versions.
-- This is a no-op for PG_VERSION_NUM >= 14
SET citus.enable_binary_protocol = TRUE;
-- Expressions inside the aggregates
-- parts of the query is inspired by TPCH queries
SELECT
@ -172,10 +176,13 @@ FROM
ORDER BY 1 DESC;
avg | cnt_1 | cnt_2 | cnt_3 | sum_1 | l_year | pos | count_pay
---------------------------------------------------------------------
30.14666771571734992301 | 3308.14619815793 | 2.5000000000000000 | | 31 | 2017 | 0 | 1
30.14666771571734992301 | 3308.14619815794 | 2.5000000000000000 | | 31 | 2017 | 0 | 1
(1 row)
SET client_min_messages TO DEBUG1;
-- Reset binary protocol back after temporaribly changing it
-- This is a no-op for PG_VERSION_NUM >= 14
RESET citus.enable_binary_protocol;
-- Multiple columns in GROUP BYs
-- foo needs to be recursively planned, bar can be pushded down
SELECT

View File

@ -0,0 +1,20 @@
-- The files we use in the following text use the text based worker copy
-- format. So we disable the binary worker copy format here.
-- This is a no-op for PG_VERSION_NUM < 14, because the default is off there.
ALTER SYSTEM SET citus.binary_worker_copy_format TO off;
SELECT pg_reload_conf();
pg_reload_conf
---------------------------------------------------------------------
t
(1 row)
SELECT success FROM run_command_on_workers('ALTER SYSTEM SET citus.binary_worker_copy_format TO off');
success
---------------------------------------------------------------------
(0 rows)
SELECT success FROM run_command_on_workers('SELECT pg_reload_conf()');
success
---------------------------------------------------------------------
(0 rows)

View File

@ -468,6 +468,8 @@ if ($useMitmproxy)
{
# make tests reproducible by never trying to negotiate ssl
push(@pgOptions, "citus.node_conninfo='sslmode=disable'");
# The commands that we intercept are based on the the text based protocol.
push(@pgOptions, "citus.enable_binary_protocol='false'");
}
elsif ($followercluster)
{

View File

@ -1,7 +1,7 @@
SET citus.shard_count = 2;
SET citus.next_shard_id TO 4754000;
CREATE SCHEMA binary_protocol;
SET search_path TO binary_protocol;
SET search_path TO binary_protocol, public;
SET citus.enable_binary_protocol = TRUE;
CREATE TABLE t(id int);
@ -19,8 +19,6 @@ SELECT id, id, id, id, id,
id, id, id, id, id
FROM t ORDER BY id;
-- EXPLAIN ANALYZE is currently forced to use text protocol. Once that is
-- changed the numbers reported should change.
EXPLAIN (ANALYZE TRUE, TIMING FALSE, COSTS FALSE, SUMMARY FALSE) SELECT id FROM t ORDER BY 1;
SET citus.explain_all_tasks TO ON;
EXPLAIN (ANALYZE TRUE, TIMING FALSE, COSTS FALSE, SUMMARY FALSE) SELECT id FROM t ORDER BY 1;
@ -39,30 +37,30 @@ CREATE TYPE composite_type AS (
CREATE TABLE composite_type_table
(
id bigserial,
col composite_type[]
col composite_type
);
SELECT create_distributed_table('composite_type_table', 'id');
INSERT INTO composite_type_table(col) VALUES (ARRAY[(1, 2)::composite_type]);
SELECT * FROM composite_type_table;
CREATE TYPE nested_composite_type AS (
a composite_type,
b composite_type
);
select run_command_on_master_and_workers($$CREATE DOMAIN binary_protocol.composite_type_domain AS binary_protocol.composite_type$$);
select run_command_on_master_and_workers($$CREATE DOMAIN binary_protocol.nested_composite_type_domain AS binary_protocol.nested_composite_type$$);
CREATE TABLE nested_composite_type_table
(
id bigserial,
col nested_composite_type
);
SELECT create_distributed_table('nested_composite_type_table', 'id');
INSERT INTO nested_composite_type_table(col) VALUES (((1, 2), (3,4))::nested_composite_type);
SELECT * FROM nested_composite_type_table;
INSERT INTO composite_type_table(col) VALUES ((1, 2)::composite_type);
SELECT col FROM composite_type_table;
SELECT col::composite_type_domain FROM composite_type_table;
SELECT (col, col) FROM composite_type_table;
SELECT (col, col)::nested_composite_type FROM composite_type_table;
SELECT (col, col)::nested_composite_type_domain FROM composite_type_table;
SELECT ARRAY[col] FROM composite_type_table;
SELECT ARRAY[col::composite_type_domain] FROM composite_type_table;
SELECT ARRAY[(col, col)] FROM composite_type_table;
SELECT ARRAY[(col, col)::nested_composite_type] FROM composite_type_table;
SELECT ARRAY[(col, col)::nested_composite_type_domain] FROM composite_type_table;
CREATE TABLE binaryless_builtin (
@ -71,8 +69,25 @@ col2 character varying(255) NOT NULL
);
SELECT create_reference_table('binaryless_builtin');
CREATE TYPE binaryless_composite_type AS (
a aclitem,
b aclitem
);
select run_command_on_master_and_workers($$CREATE DOMAIN binary_protocol.binaryless_domain AS aclitem$$);
select run_command_on_master_and_workers($$CREATE DOMAIN binary_protocol.binaryless_composite_domain AS binary_protocol.binaryless_composite_type$$);
INSERT INTO binaryless_builtin VALUES ('user postgres=r/postgres', 'test');
SELECT * FROM binaryless_builtin;
SELECT col1 FROM binaryless_builtin;
SELECT col1::binaryless_domain FROM binaryless_builtin;
SELECT (col1, col1) FROM binaryless_builtin;
SELECT (col1, col1)::binaryless_composite_type FROM binaryless_builtin;
SELECT (col1, col1)::binaryless_composite_domain FROM binaryless_builtin;
SELECT ARRAY[col1] FROM binaryless_builtin;
SELECT ARRAY[col1::binaryless_domain] FROM binaryless_builtin;
SELECT ARRAY[(col1, col1)] FROM binaryless_builtin;
SELECT ARRAY[(col1, col1)::binaryless_composite_type] FROM binaryless_builtin;
SELECT ARRAY[(col1, col1)::binaryless_composite_domain] FROM binaryless_builtin;
CREATE TABLE test_table_1(id int, val1 int);
CREATE TABLE test_table_2(id int, val1 bigint);
@ -85,6 +100,5 @@ SELECT id, val1
FROM test_table_1 LEFT JOIN test_table_2 USING(id, val1)
ORDER BY 1, 2;
\set VERBOSITY terse
SET client_min_messages TO WARNING;
DROP SCHEMA binary_protocol CASCADE;

View File

@ -187,7 +187,7 @@ SELECT broadcast_intermediate_result('stored_squares_1',
-- query the intermediate result in a router query using text format
SELECT * FROM interesting_squares JOIN (
SELECT * FROM
read_intermediate_results(ARRAY['stored_squares_1', 'stored_squares_2'], 'binary') AS res (x int, x2 int, z intermediate_results.square_type)
read_intermediate_results(ARRAY['stored_squares_1', 'stored_squares_2'], 'text') AS res (x int, x2 int, z intermediate_results.square_type)
) squares
ON (squares.x::text = interested_in) WHERE user_id = 'jon' ORDER BY 1,2;

View File

@ -1,5 +1,10 @@
SET citus.enable_repartition_joins to ON;
-- The intermediate result limits chosen below are based on text sizes of the
-- intermediate results. This is a no-op for PG_VERSION_NUM < 14, because the
-- default is false there.
SET citus.enable_binary_protocol = FALSE;
SET citus.max_intermediate_result_size TO 2;
-- should fail because the copy size is ~4kB for each cte
WITH cte AS MATERIALIZED

View File

@ -204,6 +204,8 @@ INSERT INTO distributed_table SELECT 1, '1',15 FROM distributed_table WHERE key
-- sanity check: multi-shard INSERT..SELECT pushdown goes through distributed execution
INSERT INTO distributed_table SELECT * FROM distributed_table ON CONFLICT DO NOTHING;
-- Ensure tuple data in explain analyze output is the same on all PG versions
SET citus.enable_binary_protocol = TRUE;
-- EXPLAIN for local execution just works fine
-- though going through distributed execution

View File

@ -9,6 +9,9 @@ SET citus.next_shard_id TO 570000;
SET citus.explain_distributed_queries TO on;
SET citus.enable_repartition_joins to ON;
-- Ensure tuple data in explain analyze output is the same on all PG versions
SET citus.enable_binary_protocol = TRUE;
-- Function that parses explain output as JSON
CREATE FUNCTION explain_json(query text)
RETURNS jsonb

View File

@ -102,15 +102,11 @@ select squares(4);
select multi_mx_function_call_delegation.mx_call_func(2, 0);
select multi_mx_function_call_delegation.mx_call_func_custom_types('S', 'A');
-- this is fixed with pg14 and this will fail prior to
-- pg 14
SET citus.enable_binary_protocol = TRUE;
select mx_call_func_custom_types('S', 'A');
select multi_mx_function_call_delegation.mx_call_func_custom_types('S', 'A');
RESET citus.enable_binary_protocol;
-- We don't allow distributing calls inside transactions
begin;
select mx_call_func(2, 0);

View File

@ -14,6 +14,9 @@ SET citus.shard_replication_factor to 1;
SET citus.shard_count to 2;
SET citus.next_shard_id TO 65000;
-- Ensure tuple data in explain analyze output is the same on all PG versions
SET citus.enable_binary_protocol = TRUE;
CREATE TABLE dist_table (name char, age int);
CREATE INDEX name_index on dist_table(name);

View File

@ -4,6 +4,9 @@ SET citus.shard_count TO 4;
SET citus.shard_replication_factor TO 1;
SET citus.next_shard_id TO 90630500;
-- Ensure tuple data in explain analyze output is the same on all PG versions
SET citus.enable_binary_protocol = TRUE;
-- adding the coordinator as inactive is disallowed
SELECT 1 FROM master_add_inactive_node('localhost', :master_port, groupid => 0);

View File

@ -90,6 +90,12 @@ FROM
-- we reset the client min_messages here to avoid adding an alternative output
-- for pg14 as the output slightly differs.
RESET client_min_messages;
-- Set binary protocol temporarily to true always, to get consistent float
-- output across PG versions.
-- This is a no-op for PG_VERSION_NUM >= 14
SET citus.enable_binary_protocol = TRUE;
-- Expressions inside the aggregates
-- parts of the query is inspired by TPCH queries
SELECT
@ -133,6 +139,9 @@ FROM
ORDER BY 1 DESC;
SET client_min_messages TO DEBUG1;
-- Reset binary protocol back after temporaribly changing it
-- This is a no-op for PG_VERSION_NUM >= 14
RESET citus.enable_binary_protocol;
-- Multiple columns in GROUP BYs
-- foo needs to be recursively planned, bar can be pushded down

View File

@ -0,0 +1,8 @@
-- The files we use in the following text use the text based worker copy
-- format. So we disable the binary worker copy format here.
-- This is a no-op for PG_VERSION_NUM < 14, because the default is off there.
ALTER SYSTEM SET citus.binary_worker_copy_format TO off;
SELECT pg_reload_conf();
SELECT success FROM run_command_on_workers('ALTER SYSTEM SET citus.binary_worker_copy_format TO off');
SELECT success FROM run_command_on_workers('SELECT pg_reload_conf()');

View File

@ -14,6 +14,7 @@ test: worker_copy
# ----------
# Range and hash re-partitioning related regression tests
# ----------
test: worker_disable_binary_worker_copy_format
test: worker_range_partition worker_range_partition_complex
test: worker_hash_partition worker_hash_partition_complex
test: worker_merge_range_files worker_merge_hash_files