From 8017693b2fbe1c54cc7c61161b3eec3867811ce8 Mon Sep 17 00:00:00 2001 From: Jelte Fennema Date: Tue, 9 Aug 2022 12:21:31 +0200 Subject: [PATCH] Allow specifying the shard_transfer_mode when replicating reference tables (#6070) When using `citus.replicate_reference_tables_on_activate = off`, reference tables need to be replicated later. This can be done using the `replicate_reference_tables()` UDF. However, this function only allowed blocking replication. This changes the function to default to logical replication instead, and allows choosing any of our existing shard transfer modes. --- src/backend/distributed/sql/citus--11.0-4--11.1-1.sql | 1 + .../distributed/sql/downgrades/citus--11.1-1--11.0-4.sql | 3 +++ .../sql/udfs/replicate_reference_tables/11.1-1.sql | 8 ++++++++ .../sql/udfs/replicate_reference_tables/latest.sql | 7 ++++--- src/backend/distributed/utils/reference_table_utils.c | 4 +++- src/test/regress/expected/multi_extension.out | 4 +++- .../regress/expected/multi_replicate_reference_table.out | 9 ++++++++- src/test/regress/expected/upgrade_list_citus_objects.out | 2 +- src/test/regress/sql/multi_replicate_reference_table.sql | 5 ++++- 9 files changed, 35 insertions(+), 8 deletions(-) create mode 100644 src/backend/distributed/sql/udfs/replicate_reference_tables/11.1-1.sql diff --git a/src/backend/distributed/sql/citus--11.0-4--11.1-1.sql b/src/backend/distributed/sql/citus--11.0-4--11.1-1.sql index 7160882b0..80c40238e 100644 --- a/src/backend/distributed/sql/citus--11.0-4--11.1-1.sql +++ b/src/backend/distributed/sql/citus--11.0-4--11.1-1.sql @@ -72,3 +72,4 @@ DROP FUNCTION pg_catalog.get_all_active_transactions(OUT datid oid, OUT process_ #include "udfs/worker_copy_table_to_node/11.1-1.sql" #include "udfs/worker_split_shard_replication_setup/11.1-1.sql" #include "udfs/citus_isolation_test_session_is_blocked/11.1-1.sql" +#include "udfs/replicate_reference_tables/11.1-1.sql" diff --git a/src/backend/distributed/sql/downgrades/citus--11.1-1--11.0-4.sql b/src/backend/distributed/sql/downgrades/citus--11.1-1--11.0-4.sql index 002ea471b..be213c26e 100644 --- a/src/backend/distributed/sql/downgrades/citus--11.1-1--11.0-4.sql +++ b/src/backend/distributed/sql/downgrades/citus--11.1-1--11.0-4.sql @@ -89,3 +89,6 @@ DROP FUNCTION pg_catalog.get_all_active_transactions(OUT datid oid, OUT process_ DROP VIEW pg_catalog.citus_locks; DROP FUNCTION pg_catalog.citus_locks(); + +DROP FUNCTION pg_catalog.replicate_reference_tables(citus.shard_transfer_mode); +#include "../udfs/replicate_reference_tables/9.3-2.sql" diff --git a/src/backend/distributed/sql/udfs/replicate_reference_tables/11.1-1.sql b/src/backend/distributed/sql/udfs/replicate_reference_tables/11.1-1.sql new file mode 100644 index 000000000..0d57ac870 --- /dev/null +++ b/src/backend/distributed/sql/udfs/replicate_reference_tables/11.1-1.sql @@ -0,0 +1,8 @@ +DROP FUNCTION pg_catalog.replicate_reference_tables; +CREATE FUNCTION pg_catalog.replicate_reference_tables(shard_transfer_mode citus.shard_transfer_mode default 'auto') + RETURNS VOID + LANGUAGE C STRICT + AS 'MODULE_PATHNAME', $$replicate_reference_tables$$; +COMMENT ON FUNCTION pg_catalog.replicate_reference_tables(citus.shard_transfer_mode) + IS 'replicate reference tables to all nodes'; +REVOKE ALL ON FUNCTION pg_catalog.replicate_reference_tables(citus.shard_transfer_mode) FROM PUBLIC; diff --git a/src/backend/distributed/sql/udfs/replicate_reference_tables/latest.sql b/src/backend/distributed/sql/udfs/replicate_reference_tables/latest.sql index 556899eb2..0d57ac870 100644 --- a/src/backend/distributed/sql/udfs/replicate_reference_tables/latest.sql +++ b/src/backend/distributed/sql/udfs/replicate_reference_tables/latest.sql @@ -1,7 +1,8 @@ -CREATE FUNCTION pg_catalog.replicate_reference_tables() +DROP FUNCTION pg_catalog.replicate_reference_tables; +CREATE FUNCTION pg_catalog.replicate_reference_tables(shard_transfer_mode citus.shard_transfer_mode default 'auto') RETURNS VOID LANGUAGE C STRICT AS 'MODULE_PATHNAME', $$replicate_reference_tables$$; -COMMENT ON FUNCTION pg_catalog.replicate_reference_tables() +COMMENT ON FUNCTION pg_catalog.replicate_reference_tables(citus.shard_transfer_mode) IS 'replicate reference tables to all nodes'; -REVOKE ALL ON FUNCTION pg_catalog.replicate_reference_tables() FROM PUBLIC; +REVOKE ALL ON FUNCTION pg_catalog.replicate_reference_tables(citus.shard_transfer_mode) FROM PUBLIC; diff --git a/src/backend/distributed/utils/reference_table_utils.c b/src/backend/distributed/utils/reference_table_utils.c index e0cab96d6..0ea02fe2c 100644 --- a/src/backend/distributed/utils/reference_table_utils.c +++ b/src/backend/distributed/utils/reference_table_utils.c @@ -62,7 +62,9 @@ PG_FUNCTION_INFO_V1(replicate_reference_tables); Datum replicate_reference_tables(PG_FUNCTION_ARGS) { - EnsureReferenceTablesExistOnAllNodes(); + Oid shardReplicationModeOid = PG_GETARG_OID(0); + char shardReplicationMode = LookupShardTransferMode(shardReplicationModeOid); + EnsureReferenceTablesExistOnAllNodesExtended(shardReplicationMode); PG_RETURN_VOID(); } diff --git a/src/test/regress/expected/multi_extension.out b/src/test/regress/expected/multi_extension.out index 92d54c064..03ad6d794 100644 --- a/src/test/regress/expected/multi_extension.out +++ b/src/test/regress/expected/multi_extension.out @@ -1081,6 +1081,7 @@ SELECT * FROM multi_extension.print_extension_changes(); function citus_internal.downgrade_columnar_storage(regclass) void | function citus_internal.upgrade_columnar_storage(regclass) void | function columnar.columnar_handler(internal) table_am_handler | + function replicate_reference_tables() void | function worker_cleanup_job_schema_cache() void | function worker_create_schema(bigint,text) void | function worker_fetch_foreign_file(text,text,bigint,text[],integer[]) void | @@ -1097,6 +1098,7 @@ SELECT * FROM multi_extension.print_extension_changes(); table columnar.stripe | | function citus_locks() SETOF record | function citus_split_shard_by_split_points(bigint,text[],integer[],citus.shard_transfer_mode) void + | function replicate_reference_tables(citus.shard_transfer_mode) void | function worker_copy_table_to_node(regclass,integer) void | function worker_split_copy(bigint,split_copy_info[]) void | function worker_split_shard_replication_setup(split_shard_info[]) SETOF replication_slot_info @@ -1104,7 +1106,7 @@ SELECT * FROM multi_extension.print_extension_changes(); | type split_copy_info | type split_shard_info | view citus_locks -(30 rows) +(32 rows) DROP TABLE multi_extension.prev_objects, multi_extension.extension_diff; -- show running version diff --git a/src/test/regress/expected/multi_replicate_reference_table.out b/src/test/regress/expected/multi_replicate_reference_table.out index 9f27e05a7..ab61cbd6d 100644 --- a/src/test/regress/expected/multi_replicate_reference_table.out +++ b/src/test/regress/expected/multi_replicate_reference_table.out @@ -870,7 +870,7 @@ SELECT count(*) FROM ref_table; 10 (1 row) -SELECT replicate_reference_tables(); +SELECT replicate_reference_tables('block_writes'); replicate_reference_tables --------------------------------------------------------------------- @@ -1052,7 +1052,14 @@ SELECT 1 FROM master_add_node('localhost', :worker_2_port); 1 (1 row) +-- detects correctly that referecence table doesn't have replica identity SELECT replicate_reference_tables(); +ERROR: cannot use logical replication to transfer shards of the relation initially_not_replicated_reference_table since it doesn't have a REPLICA IDENTITY or PRIMARY KEY +DETAIL: UPDATE and DELETE commands on the shard will error out during logical replication unless there is a REPLICA IDENTITY or PRIMARY KEY. +HINT: If you wish to continue without a replica identity set the shard_transfer_mode to 'force_logical' or 'block_writes'. +CONTEXT: while executing command on localhost:xxxxx +-- allows force_logical +SELECT replicate_reference_tables('force_logical'); replicate_reference_tables --------------------------------------------------------------------- diff --git a/src/test/regress/expected/upgrade_list_citus_objects.out b/src/test/regress/expected/upgrade_list_citus_objects.out index 532b61517..bcaff941f 100644 --- a/src/test/regress/expected/upgrade_list_citus_objects.out +++ b/src/test/regress/expected/upgrade_list_citus_objects.out @@ -182,7 +182,7 @@ ORDER BY 1; function recover_prepared_transactions() function relation_is_a_known_shard(regclass) function remove_local_tables_from_metadata() - function replicate_reference_tables() + function replicate_reference_tables(citus.shard_transfer_mode) function replicate_table_shards(regclass,integer,integer,bigint[],citus.shard_transfer_mode) function role_exists(name) function run_command_on_all_nodes(text,boolean,boolean) diff --git a/src/test/regress/sql/multi_replicate_reference_table.sql b/src/test/regress/sql/multi_replicate_reference_table.sql index c5671a869..50dd0a5b2 100644 --- a/src/test/regress/sql/multi_replicate_reference_table.sql +++ b/src/test/regress/sql/multi_replicate_reference_table.sql @@ -567,7 +567,7 @@ ROLLBACK; -- BEGIN; SELECT count(*) FROM ref_table; -SELECT replicate_reference_tables(); +SELECT replicate_reference_tables('block_writes'); INSERT INTO ref_table VALUES (11); SELECT count(*), sum(a) FROM ref_table; UPDATE ref_table SET a = a + 1; @@ -642,7 +642,10 @@ WHERE nodeport=:worker_1_port; SELECT 1 FROM master_remove_node('localhost', :worker_2_port); SELECT 1 FROM master_add_node('localhost', :worker_2_port); +-- detects correctly that referecence table doesn't have replica identity SELECT replicate_reference_tables(); +-- allows force_logical +SELECT replicate_reference_tables('force_logical'); SELECT result::int - :ref_table_placements FROM run_command_on_workers('SELECT count(*) FROM pg_dist_placement a, pg_dist_shard b, pg_class c WHERE a.shardid=b.shardid AND b.logicalrelid=c.oid AND c.relname=''ref_table''')