mirror of https://github.com/citusdata/citus.git
Allow specifying the shard_transfer_mode when replicating reference tables (#6070)
When using `citus.replicate_reference_tables_on_activate = off`, reference tables need to be replicated later. This can be done using the `replicate_reference_tables()` UDF. However, this function only allowed blocking replication. This changes the function to default to logical replication instead, and allows choosing any of our existing shard transfer modes.pull/6130/head
parent
a645cb4b94
commit
8017693b2f
|
@ -72,3 +72,4 @@ DROP FUNCTION pg_catalog.get_all_active_transactions(OUT datid oid, OUT process_
|
||||||
#include "udfs/worker_copy_table_to_node/11.1-1.sql"
|
#include "udfs/worker_copy_table_to_node/11.1-1.sql"
|
||||||
#include "udfs/worker_split_shard_replication_setup/11.1-1.sql"
|
#include "udfs/worker_split_shard_replication_setup/11.1-1.sql"
|
||||||
#include "udfs/citus_isolation_test_session_is_blocked/11.1-1.sql"
|
#include "udfs/citus_isolation_test_session_is_blocked/11.1-1.sql"
|
||||||
|
#include "udfs/replicate_reference_tables/11.1-1.sql"
|
||||||
|
|
|
@ -89,3 +89,6 @@ DROP FUNCTION pg_catalog.get_all_active_transactions(OUT datid oid, OUT process_
|
||||||
|
|
||||||
DROP VIEW pg_catalog.citus_locks;
|
DROP VIEW pg_catalog.citus_locks;
|
||||||
DROP FUNCTION pg_catalog.citus_locks();
|
DROP FUNCTION pg_catalog.citus_locks();
|
||||||
|
|
||||||
|
DROP FUNCTION pg_catalog.replicate_reference_tables(citus.shard_transfer_mode);
|
||||||
|
#include "../udfs/replicate_reference_tables/9.3-2.sql"
|
||||||
|
|
|
@ -0,0 +1,8 @@
|
||||||
|
DROP FUNCTION pg_catalog.replicate_reference_tables;
|
||||||
|
CREATE FUNCTION pg_catalog.replicate_reference_tables(shard_transfer_mode citus.shard_transfer_mode default 'auto')
|
||||||
|
RETURNS VOID
|
||||||
|
LANGUAGE C STRICT
|
||||||
|
AS 'MODULE_PATHNAME', $$replicate_reference_tables$$;
|
||||||
|
COMMENT ON FUNCTION pg_catalog.replicate_reference_tables(citus.shard_transfer_mode)
|
||||||
|
IS 'replicate reference tables to all nodes';
|
||||||
|
REVOKE ALL ON FUNCTION pg_catalog.replicate_reference_tables(citus.shard_transfer_mode) FROM PUBLIC;
|
|
@ -1,7 +1,8 @@
|
||||||
CREATE FUNCTION pg_catalog.replicate_reference_tables()
|
DROP FUNCTION pg_catalog.replicate_reference_tables;
|
||||||
|
CREATE FUNCTION pg_catalog.replicate_reference_tables(shard_transfer_mode citus.shard_transfer_mode default 'auto')
|
||||||
RETURNS VOID
|
RETURNS VOID
|
||||||
LANGUAGE C STRICT
|
LANGUAGE C STRICT
|
||||||
AS 'MODULE_PATHNAME', $$replicate_reference_tables$$;
|
AS 'MODULE_PATHNAME', $$replicate_reference_tables$$;
|
||||||
COMMENT ON FUNCTION pg_catalog.replicate_reference_tables()
|
COMMENT ON FUNCTION pg_catalog.replicate_reference_tables(citus.shard_transfer_mode)
|
||||||
IS 'replicate reference tables to all nodes';
|
IS 'replicate reference tables to all nodes';
|
||||||
REVOKE ALL ON FUNCTION pg_catalog.replicate_reference_tables() FROM PUBLIC;
|
REVOKE ALL ON FUNCTION pg_catalog.replicate_reference_tables(citus.shard_transfer_mode) FROM PUBLIC;
|
||||||
|
|
|
@ -62,7 +62,9 @@ PG_FUNCTION_INFO_V1(replicate_reference_tables);
|
||||||
Datum
|
Datum
|
||||||
replicate_reference_tables(PG_FUNCTION_ARGS)
|
replicate_reference_tables(PG_FUNCTION_ARGS)
|
||||||
{
|
{
|
||||||
EnsureReferenceTablesExistOnAllNodes();
|
Oid shardReplicationModeOid = PG_GETARG_OID(0);
|
||||||
|
char shardReplicationMode = LookupShardTransferMode(shardReplicationModeOid);
|
||||||
|
EnsureReferenceTablesExistOnAllNodesExtended(shardReplicationMode);
|
||||||
|
|
||||||
PG_RETURN_VOID();
|
PG_RETURN_VOID();
|
||||||
}
|
}
|
||||||
|
|
|
@ -1081,6 +1081,7 @@ SELECT * FROM multi_extension.print_extension_changes();
|
||||||
function citus_internal.downgrade_columnar_storage(regclass) void |
|
function citus_internal.downgrade_columnar_storage(regclass) void |
|
||||||
function citus_internal.upgrade_columnar_storage(regclass) void |
|
function citus_internal.upgrade_columnar_storage(regclass) void |
|
||||||
function columnar.columnar_handler(internal) table_am_handler |
|
function columnar.columnar_handler(internal) table_am_handler |
|
||||||
|
function replicate_reference_tables() void |
|
||||||
function worker_cleanup_job_schema_cache() void |
|
function worker_cleanup_job_schema_cache() void |
|
||||||
function worker_create_schema(bigint,text) void |
|
function worker_create_schema(bigint,text) void |
|
||||||
function worker_fetch_foreign_file(text,text,bigint,text[],integer[]) void |
|
function worker_fetch_foreign_file(text,text,bigint,text[],integer[]) void |
|
||||||
|
@ -1097,6 +1098,7 @@ SELECT * FROM multi_extension.print_extension_changes();
|
||||||
table columnar.stripe |
|
table columnar.stripe |
|
||||||
| function citus_locks() SETOF record
|
| function citus_locks() SETOF record
|
||||||
| function citus_split_shard_by_split_points(bigint,text[],integer[],citus.shard_transfer_mode) void
|
| function citus_split_shard_by_split_points(bigint,text[],integer[],citus.shard_transfer_mode) void
|
||||||
|
| function replicate_reference_tables(citus.shard_transfer_mode) void
|
||||||
| function worker_copy_table_to_node(regclass,integer) void
|
| function worker_copy_table_to_node(regclass,integer) void
|
||||||
| function worker_split_copy(bigint,split_copy_info[]) void
|
| function worker_split_copy(bigint,split_copy_info[]) void
|
||||||
| function worker_split_shard_replication_setup(split_shard_info[]) SETOF replication_slot_info
|
| function worker_split_shard_replication_setup(split_shard_info[]) SETOF replication_slot_info
|
||||||
|
@ -1104,7 +1106,7 @@ SELECT * FROM multi_extension.print_extension_changes();
|
||||||
| type split_copy_info
|
| type split_copy_info
|
||||||
| type split_shard_info
|
| type split_shard_info
|
||||||
| view citus_locks
|
| view citus_locks
|
||||||
(30 rows)
|
(32 rows)
|
||||||
|
|
||||||
DROP TABLE multi_extension.prev_objects, multi_extension.extension_diff;
|
DROP TABLE multi_extension.prev_objects, multi_extension.extension_diff;
|
||||||
-- show running version
|
-- show running version
|
||||||
|
|
|
@ -870,7 +870,7 @@ SELECT count(*) FROM ref_table;
|
||||||
10
|
10
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
SELECT replicate_reference_tables();
|
SELECT replicate_reference_tables('block_writes');
|
||||||
replicate_reference_tables
|
replicate_reference_tables
|
||||||
---------------------------------------------------------------------
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
@ -1052,7 +1052,14 @@ SELECT 1 FROM master_add_node('localhost', :worker_2_port);
|
||||||
1
|
1
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
|
-- detects correctly that referecence table doesn't have replica identity
|
||||||
SELECT replicate_reference_tables();
|
SELECT replicate_reference_tables();
|
||||||
|
ERROR: cannot use logical replication to transfer shards of the relation initially_not_replicated_reference_table since it doesn't have a REPLICA IDENTITY or PRIMARY KEY
|
||||||
|
DETAIL: UPDATE and DELETE commands on the shard will error out during logical replication unless there is a REPLICA IDENTITY or PRIMARY KEY.
|
||||||
|
HINT: If you wish to continue without a replica identity set the shard_transfer_mode to 'force_logical' or 'block_writes'.
|
||||||
|
CONTEXT: while executing command on localhost:xxxxx
|
||||||
|
-- allows force_logical
|
||||||
|
SELECT replicate_reference_tables('force_logical');
|
||||||
replicate_reference_tables
|
replicate_reference_tables
|
||||||
---------------------------------------------------------------------
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
|
|
@ -182,7 +182,7 @@ ORDER BY 1;
|
||||||
function recover_prepared_transactions()
|
function recover_prepared_transactions()
|
||||||
function relation_is_a_known_shard(regclass)
|
function relation_is_a_known_shard(regclass)
|
||||||
function remove_local_tables_from_metadata()
|
function remove_local_tables_from_metadata()
|
||||||
function replicate_reference_tables()
|
function replicate_reference_tables(citus.shard_transfer_mode)
|
||||||
function replicate_table_shards(regclass,integer,integer,bigint[],citus.shard_transfer_mode)
|
function replicate_table_shards(regclass,integer,integer,bigint[],citus.shard_transfer_mode)
|
||||||
function role_exists(name)
|
function role_exists(name)
|
||||||
function run_command_on_all_nodes(text,boolean,boolean)
|
function run_command_on_all_nodes(text,boolean,boolean)
|
||||||
|
|
|
@ -567,7 +567,7 @@ ROLLBACK;
|
||||||
--
|
--
|
||||||
BEGIN;
|
BEGIN;
|
||||||
SELECT count(*) FROM ref_table;
|
SELECT count(*) FROM ref_table;
|
||||||
SELECT replicate_reference_tables();
|
SELECT replicate_reference_tables('block_writes');
|
||||||
INSERT INTO ref_table VALUES (11);
|
INSERT INTO ref_table VALUES (11);
|
||||||
SELECT count(*), sum(a) FROM ref_table;
|
SELECT count(*), sum(a) FROM ref_table;
|
||||||
UPDATE ref_table SET a = a + 1;
|
UPDATE ref_table SET a = a + 1;
|
||||||
|
@ -642,7 +642,10 @@ WHERE nodeport=:worker_1_port;
|
||||||
SELECT 1 FROM master_remove_node('localhost', :worker_2_port);
|
SELECT 1 FROM master_remove_node('localhost', :worker_2_port);
|
||||||
SELECT 1 FROM master_add_node('localhost', :worker_2_port);
|
SELECT 1 FROM master_add_node('localhost', :worker_2_port);
|
||||||
|
|
||||||
|
-- detects correctly that referecence table doesn't have replica identity
|
||||||
SELECT replicate_reference_tables();
|
SELECT replicate_reference_tables();
|
||||||
|
-- allows force_logical
|
||||||
|
SELECT replicate_reference_tables('force_logical');
|
||||||
|
|
||||||
SELECT result::int - :ref_table_placements
|
SELECT result::int - :ref_table_placements
|
||||||
FROM run_command_on_workers('SELECT count(*) FROM pg_dist_placement a, pg_dist_shard b, pg_class c WHERE a.shardid=b.shardid AND b.logicalrelid=c.oid AND c.relname=''ref_table''')
|
FROM run_command_on_workers('SELECT count(*) FROM pg_dist_placement a, pg_dist_shard b, pg_class c WHERE a.shardid=b.shardid AND b.logicalrelid=c.oid AND c.relname=''ref_table''')
|
||||||
|
|
Loading…
Reference in New Issue