mirror of https://github.com/citusdata/citus.git
Enable binary logical replication for shard moves (#6017)
Using binary encoding can save a lot of CPU cycles, both on the sender and on the receiver. Since the walsender and walreceiver processes are single threaded, this can matter a lot for the throughput if they are bottlenecked on CPU. This feature is only available in PG14, not PG13. It should be safe to always enable because it's only used for types that support binary encoding according to the PG docs: > Even when this option is enabled, only data types that have binary > send and receive functions will be transferred in binary. But in case it causes problems, it can still be disabled by setting `citus.enable_binary_protocol` to `false`.pull/6228/head
parent
07cfba461a
commit
e0ada050aa
|
@ -1807,12 +1807,22 @@ CreateSubscriptions(MultiConnection *sourceConnection,
|
||||||
appendStringInfo(createSubscriptionCommand,
|
appendStringInfo(createSubscriptionCommand,
|
||||||
"CREATE SUBSCRIPTION %s CONNECTION %s PUBLICATION %s "
|
"CREATE SUBSCRIPTION %s CONNECTION %s PUBLICATION %s "
|
||||||
"WITH (citus_use_authinfo=true, create_slot=false, "
|
"WITH (citus_use_authinfo=true, create_slot=false, "
|
||||||
" copy_data=false, enabled=false, slot_name=%s)",
|
"copy_data=false, enabled=false, slot_name=%s",
|
||||||
quote_identifier(target->subscriptionName),
|
quote_identifier(target->subscriptionName),
|
||||||
quote_literal_cstr(conninfo->data),
|
quote_literal_cstr(conninfo->data),
|
||||||
quote_identifier(target->publication->name),
|
quote_identifier(target->publication->name),
|
||||||
quote_identifier(target->replicationSlot->name));
|
quote_identifier(target->replicationSlot->name));
|
||||||
|
|
||||||
|
if (EnableBinaryProtocol && PG_VERSION_NUM >= PG_VERSION_14)
|
||||||
|
{
|
||||||
|
appendStringInfoString(createSubscriptionCommand, ", binary=true)");
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
appendStringInfoString(createSubscriptionCommand, ")");
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
ExecuteCriticalRemoteCommand(target->superuserConnection,
|
ExecuteCriticalRemoteCommand(target->superuserConnection,
|
||||||
createSubscriptionCommand->data);
|
createSubscriptionCommand->data);
|
||||||
pfree(createSubscriptionCommand->data);
|
pfree(createSubscriptionCommand->data);
|
||||||
|
|
|
@ -6,3 +6,4 @@ test: isolation_setup
|
||||||
test: isolation_cluster_management
|
test: isolation_cluster_management
|
||||||
|
|
||||||
test: isolation_logical_replication_with_partitioning
|
test: isolation_logical_replication_with_partitioning
|
||||||
|
test: isolation_logical_replication_binaryless
|
||||||
|
|
|
@ -1,4 +1,5 @@
|
||||||
SET citus.shard_count = 2;
|
SET citus.shard_count = 2;
|
||||||
|
SET citus.shard_replication_factor TO 1;
|
||||||
SET citus.next_shard_id TO 4754000;
|
SET citus.next_shard_id TO 4754000;
|
||||||
CREATE SCHEMA binary_protocol;
|
CREATE SCHEMA binary_protocol;
|
||||||
SET search_path TO binary_protocol, public;
|
SET search_path TO binary_protocol, public;
|
||||||
|
@ -196,6 +197,13 @@ SELECT ARRAY[(col, col)::nested_composite_type_domain] FROM composite_type_table
|
||||||
{"(\"(1,2)\",\"(1,2)\")"}
|
{"(\"(1,2)\",\"(1,2)\")"}
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
|
-- Confirm that aclitem doesn't have receive and send functions
|
||||||
|
SELECT typreceive, typsend FROM pg_type WHERE typname = 'aclitem';
|
||||||
|
typreceive | typsend
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
- | -
|
||||||
|
(1 row)
|
||||||
|
|
||||||
CREATE TABLE binaryless_builtin (
|
CREATE TABLE binaryless_builtin (
|
||||||
col1 aclitem NOT NULL,
|
col1 aclitem NOT NULL,
|
||||||
col2 character varying(255) NOT NULL
|
col2 character varying(255) NOT NULL
|
||||||
|
|
|
@ -85,6 +85,9 @@ SET search_path TO cpu_priority;
|
||||||
-- in their CREATE SUBSCRIPTION commands.
|
-- in their CREATE SUBSCRIPTION commands.
|
||||||
SET citus.log_remote_commands TO ON;
|
SET citus.log_remote_commands TO ON;
|
||||||
SET citus.grep_remote_commands = '%CREATE SUBSCRIPTION%';
|
SET citus.grep_remote_commands = '%CREATE SUBSCRIPTION%';
|
||||||
|
-- We disable binary protocol, so we have consistent output between PG13 and
|
||||||
|
-- PG14, beacuse PG13 doesn't support binary logical replication.
|
||||||
|
SET citus.enable_binary_protocol = false;
|
||||||
SELECT master_move_shard_placement(11568900, 'localhost', :worker_1_port, 'localhost', :worker_2_port, 'force_logical');
|
SELECT master_move_shard_placement(11568900, 'localhost', :worker_1_port, 'localhost', :worker_2_port, 'force_logical');
|
||||||
NOTICE: issuing CREATE SUBSCRIPTION citus_shard_move_subscription_xxxxxxx CONNECTION 'host=''localhost'' port=xxxxx user=''postgres'' dbname=''regression'' connect_timeout=20' PUBLICATION citus_shard_move_publication_xxxxxxx_xxxxxxx WITH (citus_use_authinfo=true, create_slot=false, copy_data=false, enabled=false, slot_name=citus_shard_move_slot_xxxxxxx_xxxxxxx)
|
NOTICE: issuing CREATE SUBSCRIPTION citus_shard_move_subscription_xxxxxxx CONNECTION 'host=''localhost'' port=xxxxx user=''postgres'' dbname=''regression'' connect_timeout=20' PUBLICATION citus_shard_move_publication_xxxxxxx_xxxxxxx WITH (citus_use_authinfo=true, create_slot=false, copy_data=false, enabled=false, slot_name=citus_shard_move_slot_xxxxxxx_xxxxxxx)
|
||||||
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
|
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
|
||||||
|
|
|
@ -0,0 +1,48 @@
|
||||||
|
Parsed test spec with 3 sessions
|
||||||
|
|
||||||
|
starting permutation: s3-acquire-advisory-lock s1-move-placement s2-insert s3-release-advisory-lock s1-select
|
||||||
|
step s3-acquire-advisory-lock:
|
||||||
|
SELECT pg_advisory_lock(44000, 55152);
|
||||||
|
|
||||||
|
pg_advisory_lock
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
step s1-move-placement:
|
||||||
|
SELECT citus_move_shard_placement(45076800, 'localhost', 57637, 'localhost', 57638, shard_transfer_mode:='force_logical');
|
||||||
|
<waiting ...>
|
||||||
|
step s2-insert:
|
||||||
|
INSERT INTO t_nonbinary (SELECT i, 'user postgres=r/postgres' FROM generate_series(6, 10) i);
|
||||||
|
|
||||||
|
step s3-release-advisory-lock:
|
||||||
|
SELECT pg_advisory_unlock(44000, 55152);
|
||||||
|
|
||||||
|
pg_advisory_unlock
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
t
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
step s1-move-placement: <... completed>
|
||||||
|
citus_move_shard_placement
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
step s1-select:
|
||||||
|
SELECT * FROM t_nonbinary order by id;
|
||||||
|
|
||||||
|
id|nonbinary
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
1|postgres=r/postgres
|
||||||
|
2|postgres=r/postgres
|
||||||
|
3|postgres=r/postgres
|
||||||
|
4|postgres=r/postgres
|
||||||
|
5|postgres=r/postgres
|
||||||
|
6|postgres=r/postgres
|
||||||
|
7|postgres=r/postgres
|
||||||
|
8|postgres=r/postgres
|
||||||
|
9|postgres=r/postgres
|
||||||
|
10|postgres=r/postgres
|
||||||
|
(10 rows)
|
||||||
|
|
|
@ -1467,6 +1467,34 @@ SELECT create_distributed_table('compression_and_generated_col', 'rev', colocate
|
||||||
|
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
|
-- See that it's enabling the binary option for logical replication
|
||||||
|
BEGIN;
|
||||||
|
SET LOCAL citus.log_remote_commands TO ON;
|
||||||
|
SET LOCAL citus.grep_remote_commands = '%CREATE SUBSCRIPTION%';
|
||||||
|
SELECT citus_move_shard_placement(980042, 'localhost', :worker_1_port, 'localhost', :worker_2_port, shard_transfer_mode := 'force_logical');
|
||||||
|
NOTICE: issuing CREATE SUBSCRIPTION citus_shard_move_subscription_xxxxxxx CONNECTION 'host=''localhost'' port=xxxxx user=''postgres'' dbname=''regression'' connect_timeout=20' PUBLICATION citus_shard_move_publication_xxxxxxx_xxxxxxx WITH (citus_use_authinfo=true, create_slot=false, copy_data=false, enabled=false, slot_name=citus_shard_move_slot_xxxxxxx_xxxxxxx, binary=true)
|
||||||
|
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
|
||||||
|
citus_move_shard_placement
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
ROLLBACK;
|
||||||
|
-- See that it doesn't enable the binary option for logical replication if we
|
||||||
|
-- disable binary protocol.
|
||||||
|
BEGIN;
|
||||||
|
SET LOCAL citus.log_remote_commands TO ON;
|
||||||
|
SET LOCAL citus.grep_remote_commands = '%CREATE SUBSCRIPTION%';
|
||||||
|
SET LOCAL citus.enable_binary_protocol = FALSE;
|
||||||
|
SELECT citus_move_shard_placement(980042, 'localhost', :worker_1_port, 'localhost', :worker_2_port, shard_transfer_mode := 'force_logical');
|
||||||
|
NOTICE: issuing CREATE SUBSCRIPTION citus_shard_move_subscription_xxxxxxx CONNECTION 'host=''localhost'' port=xxxxx user=''postgres'' dbname=''regression'' connect_timeout=20' PUBLICATION citus_shard_move_publication_xxxxxxx_xxxxxxx WITH (citus_use_authinfo=true, create_slot=false, copy_data=false, enabled=false, slot_name=citus_shard_move_slot_xxxxxxx_xxxxxxx)
|
||||||
|
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
|
||||||
|
citus_move_shard_placement
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
ROLLBACK;
|
||||||
DROP TABLE compression_and_defaults, compression_and_generated_col;
|
DROP TABLE compression_and_defaults, compression_and_generated_col;
|
||||||
-- cleanup
|
-- cleanup
|
||||||
set client_min_messages to error;
|
set client_min_messages to error;
|
||||||
|
|
|
@ -0,0 +1,52 @@
|
||||||
|
// This file tests that logical replication works even when the table that's
|
||||||
|
// being moved contains columns that don't allow for binary encoding
|
||||||
|
setup
|
||||||
|
{
|
||||||
|
SET citus.shard_count TO 1;
|
||||||
|
SET citus.shard_replication_factor TO 1;
|
||||||
|
ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 45076800;
|
||||||
|
CREATE TABLE t_nonbinary(id bigserial, nonbinary aclitem);
|
||||||
|
SELECT create_distributed_table('t_nonbinary', 'id');
|
||||||
|
INSERT INTO t_nonbinary (SELECT i, 'user postgres=r/postgres' FROM generate_series(1, 5) i);
|
||||||
|
}
|
||||||
|
|
||||||
|
teardown
|
||||||
|
{
|
||||||
|
DROP TABLE t_nonbinary;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
session "s1"
|
||||||
|
|
||||||
|
step "s1-move-placement"
|
||||||
|
{
|
||||||
|
SELECT citus_move_shard_placement(45076800, 'localhost', 57637, 'localhost', 57638, shard_transfer_mode:='force_logical');
|
||||||
|
}
|
||||||
|
|
||||||
|
step "s1-select"
|
||||||
|
{
|
||||||
|
SELECT * FROM t_nonbinary order by id;
|
||||||
|
}
|
||||||
|
|
||||||
|
session "s2"
|
||||||
|
step "s2-insert"
|
||||||
|
{
|
||||||
|
INSERT INTO t_nonbinary (SELECT i, 'user postgres=r/postgres' FROM generate_series(6, 10) i);
|
||||||
|
}
|
||||||
|
|
||||||
|
session "s3"
|
||||||
|
|
||||||
|
// this advisory lock with (almost) random values are only used
|
||||||
|
// for testing purposes. For details, check Citus' logical replication
|
||||||
|
// source code
|
||||||
|
step "s3-acquire-advisory-lock"
|
||||||
|
{
|
||||||
|
SELECT pg_advisory_lock(44000, 55152);
|
||||||
|
}
|
||||||
|
|
||||||
|
step "s3-release-advisory-lock"
|
||||||
|
{
|
||||||
|
SELECT pg_advisory_unlock(44000, 55152);
|
||||||
|
}
|
||||||
|
|
||||||
|
permutation "s3-acquire-advisory-lock" "s1-move-placement" "s2-insert" "s3-release-advisory-lock" "s1-select"
|
|
@ -1,4 +1,5 @@
|
||||||
SET citus.shard_count = 2;
|
SET citus.shard_count = 2;
|
||||||
|
SET citus.shard_replication_factor TO 1;
|
||||||
SET citus.next_shard_id TO 4754000;
|
SET citus.next_shard_id TO 4754000;
|
||||||
CREATE SCHEMA binary_protocol;
|
CREATE SCHEMA binary_protocol;
|
||||||
SET search_path TO binary_protocol, public;
|
SET search_path TO binary_protocol, public;
|
||||||
|
@ -63,6 +64,8 @@ SELECT ARRAY[(col, col)::nested_composite_type] FROM composite_type_table;
|
||||||
SELECT ARRAY[(col, col)::nested_composite_type_domain] FROM composite_type_table;
|
SELECT ARRAY[(col, col)::nested_composite_type_domain] FROM composite_type_table;
|
||||||
|
|
||||||
|
|
||||||
|
-- Confirm that aclitem doesn't have receive and send functions
|
||||||
|
SELECT typreceive, typsend FROM pg_type WHERE typname = 'aclitem';
|
||||||
CREATE TABLE binaryless_builtin (
|
CREATE TABLE binaryless_builtin (
|
||||||
col1 aclitem NOT NULL,
|
col1 aclitem NOT NULL,
|
||||||
col2 character varying(255) NOT NULL
|
col2 character varying(255) NOT NULL
|
||||||
|
|
|
@ -63,6 +63,9 @@ SET search_path TO cpu_priority;
|
||||||
-- in their CREATE SUBSCRIPTION commands.
|
-- in their CREATE SUBSCRIPTION commands.
|
||||||
SET citus.log_remote_commands TO ON;
|
SET citus.log_remote_commands TO ON;
|
||||||
SET citus.grep_remote_commands = '%CREATE SUBSCRIPTION%';
|
SET citus.grep_remote_commands = '%CREATE SUBSCRIPTION%';
|
||||||
|
-- We disable binary protocol, so we have consistent output between PG13 and
|
||||||
|
-- PG14, beacuse PG13 doesn't support binary logical replication.
|
||||||
|
SET citus.enable_binary_protocol = false;
|
||||||
SELECT master_move_shard_placement(11568900, 'localhost', :worker_1_port, 'localhost', :worker_2_port, 'force_logical');
|
SELECT master_move_shard_placement(11568900, 'localhost', :worker_1_port, 'localhost', :worker_2_port, 'force_logical');
|
||||||
SET citus.cpu_priority_for_logical_replication_senders = 15;
|
SET citus.cpu_priority_for_logical_replication_senders = 15;
|
||||||
SELECT master_move_shard_placement(11568900, 'localhost', :worker_2_port, 'localhost', :worker_1_port, 'force_logical');
|
SELECT master_move_shard_placement(11568900, 'localhost', :worker_2_port, 'localhost', :worker_1_port, 'force_logical');
|
||||||
|
|
|
@ -763,6 +763,22 @@ WITH (
|
||||||
);
|
);
|
||||||
SELECT create_distributed_table('compression_and_generated_col', 'rev', colocate_with:='none');
|
SELECT create_distributed_table('compression_and_generated_col', 'rev', colocate_with:='none');
|
||||||
|
|
||||||
|
-- See that it's enabling the binary option for logical replication
|
||||||
|
BEGIN;
|
||||||
|
SET LOCAL citus.log_remote_commands TO ON;
|
||||||
|
SET LOCAL citus.grep_remote_commands = '%CREATE SUBSCRIPTION%';
|
||||||
|
SELECT citus_move_shard_placement(980042, 'localhost', :worker_1_port, 'localhost', :worker_2_port, shard_transfer_mode := 'force_logical');
|
||||||
|
ROLLBACK;
|
||||||
|
|
||||||
|
-- See that it doesn't enable the binary option for logical replication if we
|
||||||
|
-- disable binary protocol.
|
||||||
|
BEGIN;
|
||||||
|
SET LOCAL citus.log_remote_commands TO ON;
|
||||||
|
SET LOCAL citus.grep_remote_commands = '%CREATE SUBSCRIPTION%';
|
||||||
|
SET LOCAL citus.enable_binary_protocol = FALSE;
|
||||||
|
SELECT citus_move_shard_placement(980042, 'localhost', :worker_1_port, 'localhost', :worker_2_port, shard_transfer_mode := 'force_logical');
|
||||||
|
ROLLBACK;
|
||||||
|
|
||||||
DROP TABLE compression_and_defaults, compression_and_generated_col;
|
DROP TABLE compression_and_defaults, compression_and_generated_col;
|
||||||
|
|
||||||
-- cleanup
|
-- cleanup
|
||||||
|
|
Loading…
Reference in New Issue