Move update_relation_colocation, start_replication_origin_tracking, stop_replication_origin_tracking, is_replication_origin_tracking_active to citus_internal schema

pull/7478/head
eaydingol 2024-02-02 10:41:18 +00:00
parent e403fdc28e
commit 2865840d71
13 changed files with 124 additions and 27 deletions

View File

@ -2429,7 +2429,7 @@ Cleanup records always need to be committed before creating the actual object. I
PostgreSQL supports change data capture (CDC) via the logical decoding interface. The basic idea behind logical decoding is that you make a replication connection (a special type of postgres connection), start replication, and then the backend process reads through the WAL and decodes the WAL records and emits it over the wire in a format defined by the output plugin. If we were to use regular logical decoding on the nodes of a Citus cluster, we would see the name of the shard in each write, and internal data transfers such as shard moves would result in inserts being emitted. We use several techniques to avoid this.
All writes in PostgreSQL are marked with a replication origin (0 by default) and the decoder can make decisions on whether to emit the change based on the replication origin. We use this to filter out internal data transfers. If `citus.enable_change_data_capture` is enabled, all internal data transfers are marked with the special DoNotReplicateId replication origin by calling the `citus_internal_start_replication_origin_tracking()` UDF before writing the data. This replication origin ID is special in the sense that it does not need to be created (which prevents locking issues, especially when dropping replication origins). It is still up to output plugin to decide what to do with changes marked as DoNotReplicateId.
All writes in PostgreSQL are marked with a replication origin (0 by default) and the decoder can make decisions on whether to emit the change based on the replication origin. We use this to filter out internal data transfers. If `citus.enable_change_data_capture` is enabled, all internal data transfers are marked with the special DoNotReplicateId replication origin by calling the `citus_internal.start_replication_origin_tracking()` UDF before writing the data. This replication origin ID is special in the sense that it does not need to be created (which prevents locking issues, especially when dropping replication origins). It is still up to output plugin to decide what to do with changes marked as DoNotReplicateId.
We have very minimal control over replication commands like `CREATE_REPLICATION_SLOT`, since there are no direct hooks, and decoder names (e.g. “pgoutput”) are typically hard-coded in the client. The only method we found of overriding logical decoding behaviour is to overload the output plugin name in the dynamic library path.

View File

@ -1424,7 +1424,7 @@ ColocationIdUpdateCommand(Oid relationId, uint32 colocationId)
StringInfo command = makeStringInfo();
char *qualifiedRelationName = generate_qualified_relation_name(relationId);
appendStringInfo(command,
"SELECT citus_internal_update_relation_colocation(%s::regclass, %d)",
"SELECT citus_internal.update_relation_colocation(%s::regclass, %d)",
quote_literal_cstr(qualifiedRelationName), colocationId);
return command->data;

View File

@ -50,3 +50,5 @@ DROP VIEW IF EXISTS pg_catalog.citus_lock_waits;
#include "udfs/citus_drop_trigger/12.2-1.sql"
#include "udfs/citus_internal_update_none_dist_table_metadata/12.2-1.sql"
#include "udfs/citus_internal_update_placement_metadata/12.2-1.sql"
#include "udfs/citus_internal_update_relation_colocation/12.2-1.sql"
#include "udfs/repl_origin_helper/12.2-1.sql"

View File

@ -47,3 +47,7 @@ DROP FUNCTION citus_internal.unregister_tenant_schema_globally(oid, text);
#include "../udfs/citus_drop_trigger/12.0-1.sql"
DROP FUNCTION citus_internal.update_none_dist_table_metadata(oid, "char", bigint, boolean);
DROP FUNCTION citus_internal.update_placement_metadata(bigint, integer, integer);
DROP FUNCTION citus_internal.update_relation_colocation(oid, int);
DROP FUNCTION citus_internal.start_replication_origin_tracking();
DROP FUNCTION citus_internal.stop_replication_origin_tracking();
DROP FUNCTION citus_internal.is_replication_origin_tracking_active();

View File

@ -0,0 +1,14 @@
CREATE OR REPLACE FUNCTION citus_internal.update_relation_colocation(relation_id Oid, target_colocation_id int)
RETURNS void
LANGUAGE C STRICT
AS 'MODULE_PATHNAME', $$citus_internal_update_relation_colocation$$;
COMMENT ON FUNCTION citus_internal.update_relation_colocation(oid, int) IS
'Updates colocationId field of pg_dist_partition for the relation_id';
CREATE OR REPLACE FUNCTION pg_catalog.citus_internal_update_relation_colocation(relation_id Oid, target_colocation_id int)
RETURNS void
LANGUAGE C STRICT
AS 'MODULE_PATHNAME';
COMMENT ON FUNCTION pg_catalog.citus_internal_update_relation_colocation(oid, int) IS
'Updates colocationId field of pg_dist_partition for the relation_id';

View File

@ -1,3 +1,10 @@
CREATE OR REPLACE FUNCTION citus_internal.update_relation_colocation(relation_id Oid, target_colocation_id int)
RETURNS void
LANGUAGE C STRICT
AS 'MODULE_PATHNAME', $$citus_internal_update_relation_colocation$$;
COMMENT ON FUNCTION citus_internal.update_relation_colocation(oid, int) IS
'Updates colocationId field of pg_dist_partition for the relation_id';
CREATE OR REPLACE FUNCTION pg_catalog.citus_internal_update_relation_colocation(relation_id Oid, target_colocation_id int)
RETURNS void
LANGUAGE C STRICT

View File

@ -0,0 +1,41 @@
CREATE OR REPLACE FUNCTION citus_internal.start_replication_origin_tracking()
RETURNS void
LANGUAGE C STRICT
AS 'MODULE_PATHNAME', $$citus_internal_start_replication_origin_tracking$$;
COMMENT ON FUNCTION citus_internal.start_replication_origin_tracking()
IS 'To start replication origin tracking for skipping publishing of duplicated events during internal data movements for CDC';
CREATE OR REPLACE FUNCTION citus_internal.stop_replication_origin_tracking()
RETURNS void
LANGUAGE C STRICT
AS 'MODULE_PATHNAME', $$citus_internal_stop_replication_origin_tracking$$;
COMMENT ON FUNCTION citus_internal.stop_replication_origin_tracking()
IS 'To stop replication origin tracking for skipping publishing of duplicated events during internal data movements for CDC';
CREATE OR REPLACE FUNCTION citus_internal.is_replication_origin_tracking_active()
RETURNS boolean
LANGUAGE C STRICT
AS 'MODULE_PATHNAME', $$citus_internal_is_replication_origin_tracking_active$$;
COMMENT ON FUNCTION citus_internal.is_replication_origin_tracking_active()
IS 'To check if replication origin tracking is active for skipping publishing of duplicated events during internal data movements for CDC';
CREATE OR REPLACE FUNCTION pg_catalog.citus_internal_start_replication_origin_tracking()
RETURNS void
LANGUAGE C STRICT
AS 'MODULE_PATHNAME', $$citus_internal_start_replication_origin_tracking$$;
COMMENT ON FUNCTION pg_catalog.citus_internal_start_replication_origin_tracking()
IS 'To start replication origin tracking for skipping publishing of duplicated events during internal data movements for CDC';
CREATE OR REPLACE FUNCTION pg_catalog.citus_internal_stop_replication_origin_tracking()
RETURNS void
LANGUAGE C STRICT
AS 'MODULE_PATHNAME', $$citus_internal_stop_replication_origin_tracking$$;
COMMENT ON FUNCTION pg_catalog.citus_internal_stop_replication_origin_tracking()
IS 'To stop replication origin tracking for skipping publishing of duplicated events during internal data movements for CDC';
CREATE OR REPLACE FUNCTION pg_catalog.citus_internal_is_replication_origin_tracking_active()
RETURNS boolean
LANGUAGE C STRICT
AS 'MODULE_PATHNAME', $$citus_internal_is_replication_origin_tracking_active$$;
COMMENT ON FUNCTION pg_catalog.citus_internal_is_replication_origin_tracking_active()
IS 'To check if replication origin tracking is active for skipping publishing of duplicated events during internal data movements for CDC';

View File

@ -1,3 +1,24 @@
CREATE OR REPLACE FUNCTION citus_internal.start_replication_origin_tracking()
RETURNS void
LANGUAGE C STRICT
AS 'MODULE_PATHNAME', $$citus_internal_start_replication_origin_tracking$$;
COMMENT ON FUNCTION citus_internal.start_replication_origin_tracking()
IS 'To start replication origin tracking for skipping publishing of duplicated events during internal data movements for CDC';
CREATE OR REPLACE FUNCTION citus_internal.stop_replication_origin_tracking()
RETURNS void
LANGUAGE C STRICT
AS 'MODULE_PATHNAME', $$citus_internal_stop_replication_origin_tracking$$;
COMMENT ON FUNCTION citus_internal.stop_replication_origin_tracking()
IS 'To stop replication origin tracking for skipping publishing of duplicated events during internal data movements for CDC';
CREATE OR REPLACE FUNCTION citus_internal.is_replication_origin_tracking_active()
RETURNS boolean
LANGUAGE C STRICT
AS 'MODULE_PATHNAME', $$citus_internal_is_replication_origin_tracking_active$$;
COMMENT ON FUNCTION citus_internal.is_replication_origin_tracking_active()
IS 'To check if replication origin tracking is active for skipping publishing of duplicated events during internal data movements for CDC';
CREATE OR REPLACE FUNCTION pg_catalog.citus_internal_start_replication_origin_tracking()
RETURNS void
LANGUAGE C STRICT

View File

@ -186,7 +186,7 @@ SetupReplicationOriginRemoteSession(MultiConnection *connection)
{
StringInfo replicationOriginSessionSetupQuery = makeStringInfo();
appendStringInfo(replicationOriginSessionSetupQuery,
"select pg_catalog.citus_internal_start_replication_origin_tracking();");
"select citus_internal.start_replication_origin_tracking();");
ExecuteCriticalRemoteCommand(connection,
replicationOriginSessionSetupQuery->data);
connection->isReplicationOriginSessionSetup = true;
@ -205,7 +205,7 @@ ResetReplicationOriginRemoteSession(MultiConnection *connection)
{
StringInfo replicationOriginSessionResetQuery = makeStringInfo();
appendStringInfo(replicationOriginSessionResetQuery,
"select pg_catalog.citus_internal_stop_replication_origin_tracking();");
"select citus_internal.stop_replication_origin_tracking();");
ExecuteCriticalRemoteCommand(connection,
replicationOriginSessionResetQuery->data);
connection->isReplicationOriginSessionSetup = false;
@ -229,7 +229,7 @@ IsRemoteReplicationOriginSessionSetup(MultiConnection *connection)
StringInfo isReplicationOriginSessionSetupQuery = makeStringInfo();
appendStringInfo(isReplicationOriginSessionSetupQuery,
"SELECT pg_catalog.citus_internal_is_replication_origin_tracking_active()");
"SELECT citus_internal.is_replication_origin_tracking_active()");
bool result =
ExecuteRemoteCommandAndCheckResult(connection,
isReplicationOriginSessionSetupQuery->data,

View File

@ -14,7 +14,7 @@ CREATE TABLE test(col_1 int);
-- not in a distributed transaction
SELECT citus_internal.add_partition_metadata ('test'::regclass, 'h', 'col_1', 0, 's');
ERROR: This is an internal Citus function can only be used in a distributed transaction
SELECT citus_internal_update_relation_colocation ('test'::regclass, 1);
SELECT citus_internal.update_relation_colocation ('test'::regclass, 1);
ERROR: This is an internal Citus function can only be used in a distributed transaction
-- in a distributed transaction, but the application name is not Citus
BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
@ -73,7 +73,7 @@ BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
(1 row)
SET application_name to 'citus_internal gpid=10000000001';
SELECT citus_internal_update_relation_colocation ('test'::regclass, 10);
SELECT citus_internal.update_relation_colocation ('test'::regclass, 10);
ERROR: must be owner of table test
ROLLBACK;
-- finally, a user can only add its own tables to the metadata
@ -525,8 +525,8 @@ BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
(1 row)
SET application_name to 'citus_internal gpid=10000000001';
SELECT citus_internal_update_relation_colocation ('test_2'::regclass, 1231231232);
citus_internal_update_relation_colocation
SELECT citus_internal.update_relation_colocation ('test_2'::regclass, 1231231232);
update_relation_colocation
---------------------------------------------------------------------
(1 row)
@ -852,7 +852,7 @@ BEGIN;
(1 row)
SET application_name to 'citus_internal gpid=10000000001';
SELECT citus_internal_update_relation_colocation('test_2'::regclass, 251);
SELECT citus_internal.update_relation_colocation('test_2'::regclass, 251);
ERROR: cannot colocate tables test_2 and test_3
ROLLBACK;
-- now, add few more shards for test_3 to make it colocated with test_2
@ -1112,8 +1112,8 @@ BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
(1 row)
SET application_name to 'citus_internal gpid=10000000001';
SELECT citus_internal_update_relation_colocation('test_2'::regclass, 251);
citus_internal_update_relation_colocation
SELECT citus_internal.update_relation_colocation('test_2'::regclass, 251);
update_relation_colocation
---------------------------------------------------------------------
(1 row)
@ -1274,7 +1274,7 @@ BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
-- so that making two tables colocated fails
UPDATE pg_dist_partition SET repmodel = 't'
WHERE logicalrelid = 'test_2'::regclass;
SELECT citus_internal_update_relation_colocation('test_2'::regclass, 251);
SELECT citus_internal.update_relation_colocation('test_2'::regclass, 251);
ERROR: cannot colocate tables test_2 and test_3
ROLLBACK;
BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
@ -1298,7 +1298,7 @@ BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
UPDATE pg_dist_partition SET partkey = '{VAR :varno 1 :varattno 1 :vartype 20 :vartypmod -1 :varcollid 0 :varlevelsup 1 :varnoold 1 :varoattno 1 :location -1}'
WHERE logicalrelid = 'test_2'::regclass;
\endif
SELECT citus_internal_update_relation_colocation('test_2'::regclass, 251);
SELECT citus_internal.update_relation_colocation('test_2'::regclass, 251);
ERROR: cannot colocate tables test_2 and test_3
ROLLBACK;
BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
@ -1313,7 +1313,7 @@ BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
-- so that making two tables colocated fails
UPDATE pg_dist_partition SET partmethod = ''
WHERE logicalrelid = 'test_2'::regclass;
SELECT citus_internal_update_relation_colocation('test_2'::regclass, 251);
SELECT citus_internal.update_relation_colocation('test_2'::regclass, 251);
ERROR: The relation "test_2" does not have a valid entry in pg_dist_partition.
ROLLBACK;
BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
@ -1328,7 +1328,7 @@ BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
-- so that making two tables colocated fails
UPDATE pg_dist_partition SET partmethod = 'a'
WHERE logicalrelid = 'test_2'::regclass;
SELECT citus_internal_update_relation_colocation('test_2'::regclass, 251);
SELECT citus_internal.update_relation_colocation('test_2'::regclass, 251);
ERROR: Updating colocation ids are only allowed for hash and single shard distributed tables: a
ROLLBACK;
-- colocated hash distributed table should have the same dist key columns

View File

@ -1439,14 +1439,18 @@ SELECT * FROM multi_extension.print_extension_changes();
| function citus_internal.delete_tenant_schema(oid) void
| function citus_internal.execute_command_on_remote_nodes_as_user(text,text) void
| function citus_internal.global_blocked_processes() SETOF record
| function citus_internal.is_replication_origin_tracking_active() boolean
| function citus_internal.local_blocked_processes() SETOF record
| function citus_internal.mark_node_not_synced(integer,integer) void
| function citus_internal.mark_object_distributed(oid,text,oid,text) void
| function citus_internal.start_management_transaction(xid8) void
| function citus_internal.start_replication_origin_tracking() void
| function citus_internal.stop_replication_origin_tracking() void
| function citus_internal.unregister_tenant_schema_globally(oid,text) void
| function citus_internal.update_none_dist_table_metadata(oid,"char",bigint,boolean) void
| function citus_internal.update_placement_metadata(bigint,integer,integer) void
(24 rows)
| function citus_internal.update_relation_colocation(oid,integer) void
(28 rows)
DROP TABLE multi_extension.prev_objects, multi_extension.extension_diff;
-- show running version

View File

@ -84,9 +84,13 @@ ORDER BY 1;
function citus_internal.replace_isolation_tester_func()
function citus_internal.restore_isolation_tester_func()
function citus_internal.start_management_transaction(xid8)
function citus_internal.start_replication_origin_tracking()
function citus_internal.stop_replication_origin_tracking()
function citus_internal.unregister_tenant_schema_globally(oid,text)
function citus_internal.update_none_dist_table_metadata(oid,"char",bigint,boolean)
function citus_internal.update_placement_metadata(bigint,integer,integer)
function citus_internal.update_relation_colocation(oid,integer)
function citus_internal.is_replication_origin_tracking_active()
function citus_internal_add_colocation_metadata(integer,integer,integer,regtype,oid)
function citus_internal_add_object_metadata(text,text[],text[],integer,integer,boolean)
function citus_internal_add_partition_metadata(regclass,"char",text,integer,"char")
@ -367,5 +371,5 @@ ORDER BY 1;
view citus_stat_tenants_local
view pg_dist_shard_placement
view time_partitions
(357 rows)
(361 rows)

View File

@ -16,7 +16,7 @@ CREATE TABLE test(col_1 int);
-- not in a distributed transaction
SELECT citus_internal.add_partition_metadata ('test'::regclass, 'h', 'col_1', 0, 's');
SELECT citus_internal_update_relation_colocation ('test'::regclass, 1);
SELECT citus_internal.update_relation_colocation ('test'::regclass, 1);
-- in a distributed transaction, but the application name is not Citus
BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
@ -54,7 +54,7 @@ ROLLBACK;
BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02');
SET application_name to 'citus_internal gpid=10000000001';
SELECT citus_internal_update_relation_colocation ('test'::regclass, 10);
SELECT citus_internal.update_relation_colocation ('test'::regclass, 10);
ROLLBACK;
-- finally, a user can only add its own tables to the metadata
@ -302,7 +302,7 @@ COMMIT;
BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02');
SET application_name to 'citus_internal gpid=10000000001';
SELECT citus_internal_update_relation_colocation ('test_2'::regclass, 1231231232);
SELECT citus_internal.update_relation_colocation ('test_2'::regclass, 1231231232);
ROLLBACK;
-- invalid shard ids are not allowed
@ -525,7 +525,7 @@ COMMIT;
BEGIN;
SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02');
SET application_name to 'citus_internal gpid=10000000001';
SELECT citus_internal_update_relation_colocation('test_2'::regclass, 251);
SELECT citus_internal.update_relation_colocation('test_2'::regclass, 251);
ROLLBACK;
-- now, add few more shards for test_3 to make it colocated with test_2
@ -693,7 +693,7 @@ COMMIT;
BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02');
SET application_name to 'citus_internal gpid=10000000001';
SELECT citus_internal_update_relation_colocation('test_2'::regclass, 251);
SELECT citus_internal.update_relation_colocation('test_2'::regclass, 251);
ROLLBACK;
-- try to update placements
@ -788,7 +788,7 @@ BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
-- so that making two tables colocated fails
UPDATE pg_dist_partition SET repmodel = 't'
WHERE logicalrelid = 'test_2'::regclass;
SELECT citus_internal_update_relation_colocation('test_2'::regclass, 251);
SELECT citus_internal.update_relation_colocation('test_2'::regclass, 251);
ROLLBACK;
@ -810,7 +810,7 @@ BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
WHERE logicalrelid = 'test_2'::regclass;
\endif
SELECT citus_internal_update_relation_colocation('test_2'::regclass, 251);
SELECT citus_internal.update_relation_colocation('test_2'::regclass, 251);
ROLLBACK;
BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
@ -820,7 +820,7 @@ BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
-- so that making two tables colocated fails
UPDATE pg_dist_partition SET partmethod = ''
WHERE logicalrelid = 'test_2'::regclass;
SELECT citus_internal_update_relation_colocation('test_2'::regclass, 251);
SELECT citus_internal.update_relation_colocation('test_2'::regclass, 251);
ROLLBACK;
BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
@ -830,7 +830,7 @@ BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
-- so that making two tables colocated fails
UPDATE pg_dist_partition SET partmethod = 'a'
WHERE logicalrelid = 'test_2'::regclass;
SELECT citus_internal_update_relation_colocation('test_2'::regclass, 251);
SELECT citus_internal.update_relation_colocation('test_2'::regclass, 251);
ROLLBACK;
-- colocated hash distributed table should have the same dist key columns