From e0ada050aa1c544c6566b761b30f77d2810dfbe5 Mon Sep 17 00:00:00 2001 From: Jelte Fennema Date: Tue, 23 Aug 2022 16:38:00 +0200 Subject: [PATCH] Enable binary logical replication for shard moves (#6017) Using binary encoding can save a lot of CPU cycles, both on the sender and on the receiver. Since the walsender and walreceiver processes are single threaded, this can matter a lot for the throughput if they are bottlenecked on CPU. This feature is only available in PG14, not PG13. It should be safe to always enable because it's only used for types that support binary encoding according to the PG docs: > Even when this option is enabled, only data types that have binary > send and receive functions will be transferred in binary. But in case it causes problems, it can still be disabled by setting `citus.enable_binary_protocol` to `false`. --- .../replication/multi_logical_replication.c | 12 ++++- ...enterprise_isolation_logicalrep_3_schedule | 1 + src/test/regress/expected/binary_protocol.out | 8 +++ src/test/regress/expected/cpu_priority.out | 43 ++++++++------- ...olation_logical_replication_binaryless.out | 48 +++++++++++++++++ src/test/regress/expected/pg14.out | 28 ++++++++++ ...lation_logical_replication_binaryless.spec | 52 +++++++++++++++++++ src/test/regress/sql/binary_protocol.sql | 3 ++ src/test/regress/sql/cpu_priority.sql | 3 ++ src/test/regress/sql/pg14.sql | 16 ++++++ 10 files changed, 193 insertions(+), 21 deletions(-) create mode 100644 src/test/regress/expected/isolation_logical_replication_binaryless.out create mode 100644 src/test/regress/spec/isolation_logical_replication_binaryless.spec diff --git a/src/backend/distributed/replication/multi_logical_replication.c b/src/backend/distributed/replication/multi_logical_replication.c index fae6f7c39..f91025b2e 100644 --- a/src/backend/distributed/replication/multi_logical_replication.c +++ b/src/backend/distributed/replication/multi_logical_replication.c @@ -1807,12 +1807,22 @@ CreateSubscriptions(MultiConnection *sourceConnection, appendStringInfo(createSubscriptionCommand, "CREATE SUBSCRIPTION %s CONNECTION %s PUBLICATION %s " "WITH (citus_use_authinfo=true, create_slot=false, " - " copy_data=false, enabled=false, slot_name=%s)", + "copy_data=false, enabled=false, slot_name=%s", quote_identifier(target->subscriptionName), quote_literal_cstr(conninfo->data), quote_identifier(target->publication->name), quote_identifier(target->replicationSlot->name)); + if (EnableBinaryProtocol && PG_VERSION_NUM >= PG_VERSION_14) + { + appendStringInfoString(createSubscriptionCommand, ", binary=true)"); + } + else + { + appendStringInfoString(createSubscriptionCommand, ")"); + } + + ExecuteCriticalRemoteCommand(target->superuserConnection, createSubscriptionCommand->data); pfree(createSubscriptionCommand->data); diff --git a/src/test/regress/enterprise_isolation_logicalrep_3_schedule b/src/test/regress/enterprise_isolation_logicalrep_3_schedule index 1d38764bb..105dcc049 100644 --- a/src/test/regress/enterprise_isolation_logicalrep_3_schedule +++ b/src/test/regress/enterprise_isolation_logicalrep_3_schedule @@ -6,3 +6,4 @@ test: isolation_setup test: isolation_cluster_management test: isolation_logical_replication_with_partitioning +test: isolation_logical_replication_binaryless diff --git a/src/test/regress/expected/binary_protocol.out b/src/test/regress/expected/binary_protocol.out index 85fbbdecf..4b9a0eb18 100644 --- a/src/test/regress/expected/binary_protocol.out +++ b/src/test/regress/expected/binary_protocol.out @@ -1,4 +1,5 @@ SET citus.shard_count = 2; +SET citus.shard_replication_factor TO 1; SET citus.next_shard_id TO 4754000; CREATE SCHEMA binary_protocol; SET search_path TO binary_protocol, public; @@ -196,6 +197,13 @@ SELECT ARRAY[(col, col)::nested_composite_type_domain] FROM composite_type_table {"(\"(1,2)\",\"(1,2)\")"} (1 row) +-- Confirm that aclitem doesn't have receive and send functions +SELECT typreceive, typsend FROM pg_type WHERE typname = 'aclitem'; + typreceive | typsend +--------------------------------------------------------------------- + - | - +(1 row) + CREATE TABLE binaryless_builtin ( col1 aclitem NOT NULL, col2 character varying(255) NOT NULL diff --git a/src/test/regress/expected/cpu_priority.out b/src/test/regress/expected/cpu_priority.out index 5a397ef1c..008eacafd 100644 --- a/src/test/regress/expected/cpu_priority.out +++ b/src/test/regress/expected/cpu_priority.out @@ -85,14 +85,17 @@ SET search_path TO cpu_priority; -- in their CREATE SUBSCRIPTION commands. SET citus.log_remote_commands TO ON; SET citus.grep_remote_commands = '%CREATE SUBSCRIPTION%'; +-- We disable binary protocol, so we have consistent output between PG13 and +-- PG14, beacuse PG13 doesn't support binary logical replication. +SET citus.enable_binary_protocol = false; SELECT master_move_shard_placement(11568900, 'localhost', :worker_1_port, 'localhost', :worker_2_port, 'force_logical'); -NOTICE: issuing CREATE SUBSCRIPTION citus_shard_move_subscription_xxxxxxx CONNECTION 'host=''localhost'' port=xxxxx user=''postgres'' dbname=''regression'' connect_timeout=20' PUBLICATION citus_shard_move_publication_xxxxxxx_xxxxxxx WITH (citus_use_authinfo=true, create_slot=false, copy_data=false, enabled=false, slot_name=citus_shard_move_slot_xxxxxxx_xxxxxxx) +NOTICE: issuing CREATE SUBSCRIPTION citus_shard_move_subscription_xxxxxxx CONNECTION 'host=''localhost'' port=xxxxx user=''postgres'' dbname=''regression'' connect_timeout=20' PUBLICATION citus_shard_move_publication_xxxxxxx_xxxxxxx WITH (citus_use_authinfo=true, create_slot=false, copy_data=false, enabled=false, slot_name=citus_shard_move_slot_xxxxxxx_xxxxxxx) DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing CREATE SUBSCRIPTION citus_shard_move_subscription_xxxxxxx CONNECTION 'host=''localhost'' port=xxxxx user=''postgres'' dbname=''regression'' connect_timeout=20' PUBLICATION citus_shard_move_publication_xxxxxxx_xxxxxxx WITH (citus_use_authinfo=true, create_slot=false, copy_data=false, enabled=false, slot_name=citus_shard_move_slot_xxxxxxx_xxxxxxx) +NOTICE: issuing CREATE SUBSCRIPTION citus_shard_move_subscription_xxxxxxx CONNECTION 'host=''localhost'' port=xxxxx user=''postgres'' dbname=''regression'' connect_timeout=20' PUBLICATION citus_shard_move_publication_xxxxxxx_xxxxxxx WITH (citus_use_authinfo=true, create_slot=false, copy_data=false, enabled=false, slot_name=citus_shard_move_slot_xxxxxxx_xxxxxxx) DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing CREATE SUBSCRIPTION citus_shard_move_subscription_xxxxxxx CONNECTION 'host=''localhost'' port=xxxxx user=''postgres'' dbname=''regression'' connect_timeout=20' PUBLICATION citus_shard_move_publication_xxxxxxx_xxxxxxx WITH (citus_use_authinfo=true, create_slot=false, copy_data=false, enabled=false, slot_name=citus_shard_move_slot_xxxxxxx_xxxxxxx) +NOTICE: issuing CREATE SUBSCRIPTION citus_shard_move_subscription_xxxxxxx CONNECTION 'host=''localhost'' port=xxxxx user=''postgres'' dbname=''regression'' connect_timeout=20' PUBLICATION citus_shard_move_publication_xxxxxxx_xxxxxxx WITH (citus_use_authinfo=true, create_slot=false, copy_data=false, enabled=false, slot_name=citus_shard_move_slot_xxxxxxx_xxxxxxx) DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing CREATE SUBSCRIPTION citus_shard_move_subscription_xxxxxxx CONNECTION 'host=''localhost'' port=xxxxx user=''postgres'' dbname=''regression'' connect_timeout=20' PUBLICATION citus_shard_move_publication_xxxxxxx_xxxxxxx WITH (citus_use_authinfo=true, create_slot=false, copy_data=false, enabled=false, slot_name=citus_shard_move_slot_xxxxxxx_xxxxxxx) +NOTICE: issuing CREATE SUBSCRIPTION citus_shard_move_subscription_xxxxxxx CONNECTION 'host=''localhost'' port=xxxxx user=''postgres'' dbname=''regression'' connect_timeout=20' PUBLICATION citus_shard_move_publication_xxxxxxx_xxxxxxx WITH (citus_use_authinfo=true, create_slot=false, copy_data=false, enabled=false, slot_name=citus_shard_move_slot_xxxxxxx_xxxxxxx) DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx master_move_shard_placement --------------------------------------------------------------------- @@ -101,13 +104,13 @@ DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx SET citus.cpu_priority_for_logical_replication_senders = 15; SELECT master_move_shard_placement(11568900, 'localhost', :worker_2_port, 'localhost', :worker_1_port, 'force_logical'); -NOTICE: issuing CREATE SUBSCRIPTION citus_shard_move_subscription_xxxxxxx CONNECTION 'host=''localhost'' port=xxxxx user=''postgres'' dbname=''regression'' connect_timeout=20' PUBLICATION citus_shard_move_publication_xxxxxxx_xxxxxxx WITH (citus_use_authinfo=true, create_slot=false, copy_data=false, enabled=false, slot_name=citus_shard_move_slot_xxxxxxx_xxxxxxx) +NOTICE: issuing CREATE SUBSCRIPTION citus_shard_move_subscription_xxxxxxx CONNECTION 'host=''localhost'' port=xxxxx user=''postgres'' dbname=''regression'' connect_timeout=20' PUBLICATION citus_shard_move_publication_xxxxxxx_xxxxxxx WITH (citus_use_authinfo=true, create_slot=false, copy_data=false, enabled=false, slot_name=citus_shard_move_slot_xxxxxxx_xxxxxxx) DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing CREATE SUBSCRIPTION citus_shard_move_subscription_xxxxxxx CONNECTION 'host=''localhost'' port=xxxxx user=''postgres'' dbname=''regression'' connect_timeout=20' PUBLICATION citus_shard_move_publication_xxxxxxx_xxxxxxx WITH (citus_use_authinfo=true, create_slot=false, copy_data=false, enabled=false, slot_name=citus_shard_move_slot_xxxxxxx_xxxxxxx) +NOTICE: issuing CREATE SUBSCRIPTION citus_shard_move_subscription_xxxxxxx CONNECTION 'host=''localhost'' port=xxxxx user=''postgres'' dbname=''regression'' connect_timeout=20' PUBLICATION citus_shard_move_publication_xxxxxxx_xxxxxxx WITH (citus_use_authinfo=true, create_slot=false, copy_data=false, enabled=false, slot_name=citus_shard_move_slot_xxxxxxx_xxxxxxx) DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing CREATE SUBSCRIPTION citus_shard_move_subscription_xxxxxxx CONNECTION 'host=''localhost'' port=xxxxx user=''postgres'' dbname=''regression'' connect_timeout=20' PUBLICATION citus_shard_move_publication_xxxxxxx_xxxxxxx WITH (citus_use_authinfo=true, create_slot=false, copy_data=false, enabled=false, slot_name=citus_shard_move_slot_xxxxxxx_xxxxxxx) +NOTICE: issuing CREATE SUBSCRIPTION citus_shard_move_subscription_xxxxxxx CONNECTION 'host=''localhost'' port=xxxxx user=''postgres'' dbname=''regression'' connect_timeout=20' PUBLICATION citus_shard_move_publication_xxxxxxx_xxxxxxx WITH (citus_use_authinfo=true, create_slot=false, copy_data=false, enabled=false, slot_name=citus_shard_move_slot_xxxxxxx_xxxxxxx) DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing CREATE SUBSCRIPTION citus_shard_move_subscription_xxxxxxx CONNECTION 'host=''localhost'' port=xxxxx user=''postgres'' dbname=''regression'' connect_timeout=20' PUBLICATION citus_shard_move_publication_xxxxxxx_xxxxxxx WITH (citus_use_authinfo=true, create_slot=false, copy_data=false, enabled=false, slot_name=citus_shard_move_slot_xxxxxxx_xxxxxxx) +NOTICE: issuing CREATE SUBSCRIPTION citus_shard_move_subscription_xxxxxxx CONNECTION 'host=''localhost'' port=xxxxx user=''postgres'' dbname=''regression'' connect_timeout=20' PUBLICATION citus_shard_move_publication_xxxxxxx_xxxxxxx WITH (citus_use_authinfo=true, create_slot=false, copy_data=false, enabled=false, slot_name=citus_shard_move_slot_xxxxxxx_xxxxxxx) DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx master_move_shard_placement --------------------------------------------------------------------- @@ -116,13 +119,13 @@ DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx SET citus.max_high_priority_background_processes = 3; SELECT master_move_shard_placement(11568900, 'localhost', :worker_1_port, 'localhost', :worker_2_port, 'force_logical'); -NOTICE: issuing CREATE SUBSCRIPTION citus_shard_move_subscription_xxxxxxx CONNECTION 'host=''localhost'' port=xxxxx user=''postgres'' dbname=''regression'' connect_timeout=20' PUBLICATION citus_shard_move_publication_xxxxxxx_xxxxxxx WITH (citus_use_authinfo=true, create_slot=false, copy_data=false, enabled=false, slot_name=citus_shard_move_slot_xxxxxxx_xxxxxxx) +NOTICE: issuing CREATE SUBSCRIPTION citus_shard_move_subscription_xxxxxxx CONNECTION 'host=''localhost'' port=xxxxx user=''postgres'' dbname=''regression'' connect_timeout=20' PUBLICATION citus_shard_move_publication_xxxxxxx_xxxxxxx WITH (citus_use_authinfo=true, create_slot=false, copy_data=false, enabled=false, slot_name=citus_shard_move_slot_xxxxxxx_xxxxxxx) DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing CREATE SUBSCRIPTION citus_shard_move_subscription_xxxxxxx CONNECTION 'host=''localhost'' port=xxxxx user=''postgres'' dbname=''regression'' connect_timeout=20' PUBLICATION citus_shard_move_publication_xxxxxxx_xxxxxxx WITH (citus_use_authinfo=true, create_slot=false, copy_data=false, enabled=false, slot_name=citus_shard_move_slot_xxxxxxx_xxxxxxx) +NOTICE: issuing CREATE SUBSCRIPTION citus_shard_move_subscription_xxxxxxx CONNECTION 'host=''localhost'' port=xxxxx user=''postgres'' dbname=''regression'' connect_timeout=20' PUBLICATION citus_shard_move_publication_xxxxxxx_xxxxxxx WITH (citus_use_authinfo=true, create_slot=false, copy_data=false, enabled=false, slot_name=citus_shard_move_slot_xxxxxxx_xxxxxxx) DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing CREATE SUBSCRIPTION citus_shard_move_subscription_xxxxxxx CONNECTION 'host=''localhost'' port=xxxxx user=''postgres'' dbname=''regression'' connect_timeout=20' PUBLICATION citus_shard_move_publication_xxxxxxx_xxxxxxx WITH (citus_use_authinfo=true, create_slot=false, copy_data=false, enabled=false, slot_name=citus_shard_move_slot_xxxxxxx_xxxxxxx) +NOTICE: issuing CREATE SUBSCRIPTION citus_shard_move_subscription_xxxxxxx CONNECTION 'host=''localhost'' port=xxxxx user=''postgres'' dbname=''regression'' connect_timeout=20' PUBLICATION citus_shard_move_publication_xxxxxxx_xxxxxxx WITH (citus_use_authinfo=true, create_slot=false, copy_data=false, enabled=false, slot_name=citus_shard_move_slot_xxxxxxx_xxxxxxx) DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing CREATE SUBSCRIPTION citus_shard_move_subscription_xxxxxxx CONNECTION 'host=''localhost'' port=xxxxx user=''postgres'' dbname=''regression'' connect_timeout=20' PUBLICATION citus_shard_move_publication_xxxxxxx_xxxxxxx WITH (citus_use_authinfo=true, create_slot=false, copy_data=false, enabled=false, slot_name=citus_shard_move_slot_xxxxxxx_xxxxxxx) +NOTICE: issuing CREATE SUBSCRIPTION citus_shard_move_subscription_xxxxxxx CONNECTION 'host=''localhost'' port=xxxxx user=''postgres'' dbname=''regression'' connect_timeout=20' PUBLICATION citus_shard_move_publication_xxxxxxx_xxxxxxx WITH (citus_use_authinfo=true, create_slot=false, copy_data=false, enabled=false, slot_name=citus_shard_move_slot_xxxxxxx_xxxxxxx) DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx master_move_shard_placement --------------------------------------------------------------------- @@ -138,21 +141,21 @@ SELECT pg_catalog.citus_split_shard_by_split_points( ARRAY['-1500000000'], ARRAY[:worker_1_node, :worker_2_node], 'force_logical'); -NOTICE: issuing CREATE SUBSCRIPTION citus_shard_split_subscription_xxxxxxx CONNECTION 'host=''localhost'' port=xxxxx user=''postgres'' dbname=''regression'' connect_timeout=20' PUBLICATION citus_shard_split_publication_xxxxxxx_xxxxxxx WITH (citus_use_authinfo=true, create_slot=false, copy_data=false, enabled=false, slot_name=citus_shard_split_slot_xxxxxxx_xxxxxxx) +NOTICE: issuing CREATE SUBSCRIPTION citus_shard_split_subscription_xxxxxxx CONNECTION 'host=''localhost'' port=xxxxx user=''postgres'' dbname=''regression'' connect_timeout=20' PUBLICATION citus_shard_split_publication_xxxxxxx_xxxxxxx WITH (citus_use_authinfo=true, create_slot=false, copy_data=false, enabled=false, slot_name=citus_shard_split_slot_xxxxxxx_xxxxxxx) DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing CREATE SUBSCRIPTION citus_shard_split_subscription_xxxxxxx CONNECTION 'host=''localhost'' port=xxxxx user=''postgres'' dbname=''regression'' connect_timeout=20' PUBLICATION citus_shard_split_publication_xxxxxxx_xxxxxxx WITH (citus_use_authinfo=true, create_slot=false, copy_data=false, enabled=false, slot_name=citus_shard_split_slot_xxxxxxx_xxxxxxx) +NOTICE: issuing CREATE SUBSCRIPTION citus_shard_split_subscription_xxxxxxx CONNECTION 'host=''localhost'' port=xxxxx user=''postgres'' dbname=''regression'' connect_timeout=20' PUBLICATION citus_shard_split_publication_xxxxxxx_xxxxxxx WITH (citus_use_authinfo=true, create_slot=false, copy_data=false, enabled=false, slot_name=citus_shard_split_slot_xxxxxxx_xxxxxxx) DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing CREATE SUBSCRIPTION citus_shard_split_subscription_xxxxxxx CONNECTION 'host=''localhost'' port=xxxxx user=''postgres'' dbname=''regression'' connect_timeout=20' PUBLICATION citus_shard_split_publication_xxxxxxx_xxxxxxx WITH (citus_use_authinfo=true, create_slot=false, copy_data=false, enabled=false, slot_name=citus_shard_split_slot_xxxxxxx_xxxxxxx) +NOTICE: issuing CREATE SUBSCRIPTION citus_shard_split_subscription_xxxxxxx CONNECTION 'host=''localhost'' port=xxxxx user=''postgres'' dbname=''regression'' connect_timeout=20' PUBLICATION citus_shard_split_publication_xxxxxxx_xxxxxxx WITH (citus_use_authinfo=true, create_slot=false, copy_data=false, enabled=false, slot_name=citus_shard_split_slot_xxxxxxx_xxxxxxx) DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing CREATE SUBSCRIPTION citus_shard_split_subscription_xxxxxxx CONNECTION 'host=''localhost'' port=xxxxx user=''postgres'' dbname=''regression'' connect_timeout=20' PUBLICATION citus_shard_split_publication_xxxxxxx_xxxxxxx WITH (citus_use_authinfo=true, create_slot=false, copy_data=false, enabled=false, slot_name=citus_shard_split_slot_xxxxxxx_xxxxxxx) +NOTICE: issuing CREATE SUBSCRIPTION citus_shard_split_subscription_xxxxxxx CONNECTION 'host=''localhost'' port=xxxxx user=''postgres'' dbname=''regression'' connect_timeout=20' PUBLICATION citus_shard_split_publication_xxxxxxx_xxxxxxx WITH (citus_use_authinfo=true, create_slot=false, copy_data=false, enabled=false, slot_name=citus_shard_split_slot_xxxxxxx_xxxxxxx) DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing CREATE SUBSCRIPTION citus_shard_split_subscription_xxxxxxx CONNECTION 'host=''localhost'' port=xxxxx user=''postgres'' dbname=''regression'' connect_timeout=20' PUBLICATION citus_shard_split_publication_xxxxxxx_xxxxxxx WITH (citus_use_authinfo=true, create_slot=false, copy_data=false, enabled=false, slot_name=citus_shard_split_slot_xxxxxxx_xxxxxxx) +NOTICE: issuing CREATE SUBSCRIPTION citus_shard_split_subscription_xxxxxxx CONNECTION 'host=''localhost'' port=xxxxx user=''postgres'' dbname=''regression'' connect_timeout=20' PUBLICATION citus_shard_split_publication_xxxxxxx_xxxxxxx WITH (citus_use_authinfo=true, create_slot=false, copy_data=false, enabled=false, slot_name=citus_shard_split_slot_xxxxxxx_xxxxxxx) DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing CREATE SUBSCRIPTION citus_shard_split_subscription_xxxxxxx CONNECTION 'host=''localhost'' port=xxxxx user=''postgres'' dbname=''regression'' connect_timeout=20' PUBLICATION citus_shard_split_publication_xxxxxxx_xxxxxxx WITH (citus_use_authinfo=true, create_slot=false, copy_data=false, enabled=false, slot_name=citus_shard_split_slot_xxxxxxx_xxxxxxx) +NOTICE: issuing CREATE SUBSCRIPTION citus_shard_split_subscription_xxxxxxx CONNECTION 'host=''localhost'' port=xxxxx user=''postgres'' dbname=''regression'' connect_timeout=20' PUBLICATION citus_shard_split_publication_xxxxxxx_xxxxxxx WITH (citus_use_authinfo=true, create_slot=false, copy_data=false, enabled=false, slot_name=citus_shard_split_slot_xxxxxxx_xxxxxxx) DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing CREATE SUBSCRIPTION citus_shard_split_subscription_xxxxxxx CONNECTION 'host=''localhost'' port=xxxxx user=''postgres'' dbname=''regression'' connect_timeout=20' PUBLICATION citus_shard_split_publication_xxxxxxx_xxxxxxx WITH (citus_use_authinfo=true, create_slot=false, copy_data=false, enabled=false, slot_name=citus_shard_split_slot_xxxxxxx_xxxxxxx) +NOTICE: issuing CREATE SUBSCRIPTION citus_shard_split_subscription_xxxxxxx CONNECTION 'host=''localhost'' port=xxxxx user=''postgres'' dbname=''regression'' connect_timeout=20' PUBLICATION citus_shard_split_publication_xxxxxxx_xxxxxxx WITH (citus_use_authinfo=true, create_slot=false, copy_data=false, enabled=false, slot_name=citus_shard_split_slot_xxxxxxx_xxxxxxx) DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing CREATE SUBSCRIPTION citus_shard_split_subscription_xxxxxxx CONNECTION 'host=''localhost'' port=xxxxx user=''postgres'' dbname=''regression'' connect_timeout=20' PUBLICATION citus_shard_split_publication_xxxxxxx_xxxxxxx WITH (citus_use_authinfo=true, create_slot=false, copy_data=false, enabled=false, slot_name=citus_shard_split_slot_xxxxxxx_xxxxxxx) +NOTICE: issuing CREATE SUBSCRIPTION citus_shard_split_subscription_xxxxxxx CONNECTION 'host=''localhost'' port=xxxxx user=''postgres'' dbname=''regression'' connect_timeout=20' PUBLICATION citus_shard_split_publication_xxxxxxx_xxxxxxx WITH (citus_use_authinfo=true, create_slot=false, copy_data=false, enabled=false, slot_name=citus_shard_split_slot_xxxxxxx_xxxxxxx) DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx citus_split_shard_by_split_points --------------------------------------------------------------------- diff --git a/src/test/regress/expected/isolation_logical_replication_binaryless.out b/src/test/regress/expected/isolation_logical_replication_binaryless.out new file mode 100644 index 000000000..383929e33 --- /dev/null +++ b/src/test/regress/expected/isolation_logical_replication_binaryless.out @@ -0,0 +1,48 @@ +Parsed test spec with 3 sessions + +starting permutation: s3-acquire-advisory-lock s1-move-placement s2-insert s3-release-advisory-lock s1-select +step s3-acquire-advisory-lock: + SELECT pg_advisory_lock(44000, 55152); + +pg_advisory_lock +--------------------------------------------------------------------- + +(1 row) + +step s1-move-placement: + SELECT citus_move_shard_placement(45076800, 'localhost', 57637, 'localhost', 57638, shard_transfer_mode:='force_logical'); + +step s2-insert: + INSERT INTO t_nonbinary (SELECT i, 'user postgres=r/postgres' FROM generate_series(6, 10) i); + +step s3-release-advisory-lock: + SELECT pg_advisory_unlock(44000, 55152); + +pg_advisory_unlock +--------------------------------------------------------------------- +t +(1 row) + +step s1-move-placement: <... completed> +citus_move_shard_placement +--------------------------------------------------------------------- + +(1 row) + +step s1-select: + SELECT * FROM t_nonbinary order by id; + +id|nonbinary +--------------------------------------------------------------------- + 1|postgres=r/postgres + 2|postgres=r/postgres + 3|postgres=r/postgres + 4|postgres=r/postgres + 5|postgres=r/postgres + 6|postgres=r/postgres + 7|postgres=r/postgres + 8|postgres=r/postgres + 9|postgres=r/postgres +10|postgres=r/postgres +(10 rows) + diff --git a/src/test/regress/expected/pg14.out b/src/test/regress/expected/pg14.out index fa6e42065..fb6312965 100644 --- a/src/test/regress/expected/pg14.out +++ b/src/test/regress/expected/pg14.out @@ -1467,6 +1467,34 @@ SELECT create_distributed_table('compression_and_generated_col', 'rev', colocate (1 row) +-- See that it's enabling the binary option for logical replication +BEGIN; +SET LOCAL citus.log_remote_commands TO ON; +SET LOCAL citus.grep_remote_commands = '%CREATE SUBSCRIPTION%'; +SELECT citus_move_shard_placement(980042, 'localhost', :worker_1_port, 'localhost', :worker_2_port, shard_transfer_mode := 'force_logical'); +NOTICE: issuing CREATE SUBSCRIPTION citus_shard_move_subscription_xxxxxxx CONNECTION 'host=''localhost'' port=xxxxx user=''postgres'' dbname=''regression'' connect_timeout=20' PUBLICATION citus_shard_move_publication_xxxxxxx_xxxxxxx WITH (citus_use_authinfo=true, create_slot=false, copy_data=false, enabled=false, slot_name=citus_shard_move_slot_xxxxxxx_xxxxxxx, binary=true) +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx + citus_move_shard_placement +--------------------------------------------------------------------- + +(1 row) + +ROLLBACK; +-- See that it doesn't enable the binary option for logical replication if we +-- disable binary protocol. +BEGIN; +SET LOCAL citus.log_remote_commands TO ON; +SET LOCAL citus.grep_remote_commands = '%CREATE SUBSCRIPTION%'; +SET LOCAL citus.enable_binary_protocol = FALSE; +SELECT citus_move_shard_placement(980042, 'localhost', :worker_1_port, 'localhost', :worker_2_port, shard_transfer_mode := 'force_logical'); +NOTICE: issuing CREATE SUBSCRIPTION citus_shard_move_subscription_xxxxxxx CONNECTION 'host=''localhost'' port=xxxxx user=''postgres'' dbname=''regression'' connect_timeout=20' PUBLICATION citus_shard_move_publication_xxxxxxx_xxxxxxx WITH (citus_use_authinfo=true, create_slot=false, copy_data=false, enabled=false, slot_name=citus_shard_move_slot_xxxxxxx_xxxxxxx) +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx + citus_move_shard_placement +--------------------------------------------------------------------- + +(1 row) + +ROLLBACK; DROP TABLE compression_and_defaults, compression_and_generated_col; -- cleanup set client_min_messages to error; diff --git a/src/test/regress/spec/isolation_logical_replication_binaryless.spec b/src/test/regress/spec/isolation_logical_replication_binaryless.spec new file mode 100644 index 000000000..f89dac62e --- /dev/null +++ b/src/test/regress/spec/isolation_logical_replication_binaryless.spec @@ -0,0 +1,52 @@ +// This file tests that logical replication works even when the table that's +// being moved contains columns that don't allow for binary encoding +setup +{ + SET citus.shard_count TO 1; + SET citus.shard_replication_factor TO 1; + ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 45076800; + CREATE TABLE t_nonbinary(id bigserial, nonbinary aclitem); + SELECT create_distributed_table('t_nonbinary', 'id'); + INSERT INTO t_nonbinary (SELECT i, 'user postgres=r/postgres' FROM generate_series(1, 5) i); +} + +teardown +{ + DROP TABLE t_nonbinary; +} + + +session "s1" + +step "s1-move-placement" +{ + SELECT citus_move_shard_placement(45076800, 'localhost', 57637, 'localhost', 57638, shard_transfer_mode:='force_logical'); +} + +step "s1-select" +{ + SELECT * FROM t_nonbinary order by id; +} + +session "s2" +step "s2-insert" +{ + INSERT INTO t_nonbinary (SELECT i, 'user postgres=r/postgres' FROM generate_series(6, 10) i); +} + +session "s3" + +// this advisory lock with (almost) random values are only used +// for testing purposes. For details, check Citus' logical replication +// source code +step "s3-acquire-advisory-lock" +{ + SELECT pg_advisory_lock(44000, 55152); +} + +step "s3-release-advisory-lock" +{ + SELECT pg_advisory_unlock(44000, 55152); +} + +permutation "s3-acquire-advisory-lock" "s1-move-placement" "s2-insert" "s3-release-advisory-lock" "s1-select" diff --git a/src/test/regress/sql/binary_protocol.sql b/src/test/regress/sql/binary_protocol.sql index 8c9761313..a6eefc14e 100644 --- a/src/test/regress/sql/binary_protocol.sql +++ b/src/test/regress/sql/binary_protocol.sql @@ -1,4 +1,5 @@ SET citus.shard_count = 2; +SET citus.shard_replication_factor TO 1; SET citus.next_shard_id TO 4754000; CREATE SCHEMA binary_protocol; SET search_path TO binary_protocol, public; @@ -63,6 +64,8 @@ SELECT ARRAY[(col, col)::nested_composite_type] FROM composite_type_table; SELECT ARRAY[(col, col)::nested_composite_type_domain] FROM composite_type_table; +-- Confirm that aclitem doesn't have receive and send functions +SELECT typreceive, typsend FROM pg_type WHERE typname = 'aclitem'; CREATE TABLE binaryless_builtin ( col1 aclitem NOT NULL, col2 character varying(255) NOT NULL diff --git a/src/test/regress/sql/cpu_priority.sql b/src/test/regress/sql/cpu_priority.sql index 787018622..921934f88 100644 --- a/src/test/regress/sql/cpu_priority.sql +++ b/src/test/regress/sql/cpu_priority.sql @@ -63,6 +63,9 @@ SET search_path TO cpu_priority; -- in their CREATE SUBSCRIPTION commands. SET citus.log_remote_commands TO ON; SET citus.grep_remote_commands = '%CREATE SUBSCRIPTION%'; +-- We disable binary protocol, so we have consistent output between PG13 and +-- PG14, beacuse PG13 doesn't support binary logical replication. +SET citus.enable_binary_protocol = false; SELECT master_move_shard_placement(11568900, 'localhost', :worker_1_port, 'localhost', :worker_2_port, 'force_logical'); SET citus.cpu_priority_for_logical_replication_senders = 15; SELECT master_move_shard_placement(11568900, 'localhost', :worker_2_port, 'localhost', :worker_1_port, 'force_logical'); diff --git a/src/test/regress/sql/pg14.sql b/src/test/regress/sql/pg14.sql index e40d52c10..34baf863d 100644 --- a/src/test/regress/sql/pg14.sql +++ b/src/test/regress/sql/pg14.sql @@ -763,6 +763,22 @@ WITH ( ); SELECT create_distributed_table('compression_and_generated_col', 'rev', colocate_with:='none'); +-- See that it's enabling the binary option for logical replication +BEGIN; +SET LOCAL citus.log_remote_commands TO ON; +SET LOCAL citus.grep_remote_commands = '%CREATE SUBSCRIPTION%'; +SELECT citus_move_shard_placement(980042, 'localhost', :worker_1_port, 'localhost', :worker_2_port, shard_transfer_mode := 'force_logical'); +ROLLBACK; + +-- See that it doesn't enable the binary option for logical replication if we +-- disable binary protocol. +BEGIN; +SET LOCAL citus.log_remote_commands TO ON; +SET LOCAL citus.grep_remote_commands = '%CREATE SUBSCRIPTION%'; +SET LOCAL citus.enable_binary_protocol = FALSE; +SELECT citus_move_shard_placement(980042, 'localhost', :worker_1_port, 'localhost', :worker_2_port, shard_transfer_mode := 'force_logical'); +ROLLBACK; + DROP TABLE compression_and_defaults, compression_and_generated_col; -- cleanup