Move remaining citus_internal functions (#7478)

Moves the following functions to the Citus internal schema: 

citus_internal_local_blocked_processes
citus_internal_global_blocked_processes
citus_internal_mark_node_not_synced
citus_internal_unregister_tenant_schema_globally
citus_internal_update_none_dist_table_metadata
citus_internal_update_placement_metadata
citus_internal_update_relation_colocation
citus_internal_start_replication_origin_tracking
citus_internal_stop_replication_origin_tracking
citus_internal_is_replication_origin_tracking_active


#7405

---------

Co-authored-by: Jelte Fennema-Nio <jelte.fennema@microsoft.com>
pull/7486/head
eaydingol 2024-02-07 16:58:17 +03:00 committed by GitHub
parent 6869b3ad10
commit f01c5f2593
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
43 changed files with 619 additions and 79 deletions

View File

@ -2429,7 +2429,7 @@ Cleanup records always need to be committed before creating the actual object. I
PostgreSQL supports change data capture (CDC) via the logical decoding interface. The basic idea behind logical decoding is that you make a replication connection (a special type of postgres connection), start replication, and then the backend process reads through the WAL and decodes the WAL records and emits it over the wire in a format defined by the output plugin. If we were to use regular logical decoding on the nodes of a Citus cluster, we would see the name of the shard in each write, and internal data transfers such as shard moves would result in inserts being emitted. We use several techniques to avoid this.
All writes in PostgreSQL are marked with a replication origin (0 by default) and the decoder can make decisions on whether to emit the change based on the replication origin. We use this to filter out internal data transfers. If `citus.enable_change_data_capture` is enabled, all internal data transfers are marked with the special DoNotReplicateId replication origin by calling the `citus_internal_start_replication_origin_tracking()` UDF before writing the data. This replication origin ID is special in the sense that it does not need to be created (which prevents locking issues, especially when dropping replication origins). It is still up to output plugin to decide what to do with changes marked as DoNotReplicateId.
All writes in PostgreSQL are marked with a replication origin (0 by default) and the decoder can make decisions on whether to emit the change based on the replication origin. We use this to filter out internal data transfers. If `citus.enable_change_data_capture` is enabled, all internal data transfers are marked with the special DoNotReplicateId replication origin by calling the `citus_internal.start_replication_origin_tracking()` UDF before writing the data. This replication origin ID is special in the sense that it does not need to be created (which prevents locking issues, especially when dropping replication origins). It is still up to output plugin to decide what to do with changes marked as DoNotReplicateId.
We have very minimal control over replication commands like `CREATE_REPLICATION_SLOT`, since there are no direct hooks, and decoder names (e.g. “pgoutput”) are typically hard-coded in the client. The only method we found of overriding logical decoding behaviour is to overload the output plugin name in the dynamic library path.

View File

@ -1424,7 +1424,7 @@ ColocationIdUpdateCommand(Oid relationId, uint32 colocationId)
StringInfo command = makeStringInfo();
char *qualifiedRelationName = generate_qualified_relation_name(relationId);
appendStringInfo(command,
"SELECT citus_internal_update_relation_colocation(%s::regclass, %d)",
"SELECT citus_internal.update_relation_colocation(%s::regclass, %d)",
quote_literal_cstr(qualifiedRelationName), colocationId);
return command->data;
@ -4258,7 +4258,7 @@ UpdateNoneDistTableMetadataCommand(Oid relationId, char replicationModel,
{
StringInfo command = makeStringInfo();
appendStringInfo(command,
"SELECT pg_catalog.citus_internal_update_none_dist_table_metadata(%s, '%c', %u, %s)",
"SELECT citus_internal.update_none_dist_table_metadata(%s, '%c', %u, %s)",
RemoteTableIdExpression(relationId), replicationModel, colocationId,
autoConverted ? "true" : "false");

View File

@ -1692,7 +1692,7 @@ EnsureParentSessionHasExclusiveLockOnPgDistNode(pid_t parentSessionPid)
if (!parentHasExclusiveLock)
{
ereport(ERROR, (errmsg("lock is not held by the caller. Unexpected caller "
"for citus_internal_mark_node_not_synced")));
"for citus_internal.mark_node_not_synced")));
}
}

View File

@ -2046,7 +2046,7 @@ UpdateColocatedShardPlacementMetadataOnWorkers(int64 shardId,
StringInfo updateCommand = makeStringInfo();
appendStringInfo(updateCommand,
"SELECT citus_internal_update_placement_metadata(%ld, %d, %d)",
"SELECT citus_internal.update_placement_metadata(%ld, %d, %d)",
colocatedShard->shardId,
sourceGroupId, targetGroupId);
SendCommandToWorkersWithMetadata(updateCommand->data);

View File

@ -38,3 +38,17 @@ REVOKE ALL ON FUNCTION citus_internal.start_management_transaction FROM PUBLIC;
#include "udfs/citus_internal_delete_placement_metadata/12.2-1.sql"
#include "udfs/citus_internal_delete_shard_metadata/12.2-1.sql"
#include "udfs/citus_internal_delete_tenant_schema/12.2-1.sql"
#include "udfs/citus_internal_local_blocked_processes/12.2-1.sql"
#include "udfs/citus_internal_global_blocked_processes/12.2-1.sql"
#include "udfs/citus_blocking_pids/12.2-1.sql"
#include "udfs/citus_isolation_test_session_is_blocked/12.2-1.sql"
DROP VIEW IF EXISTS pg_catalog.citus_lock_waits;
#include "udfs/citus_lock_waits/12.2-1.sql"
#include "udfs/citus_internal_mark_node_not_synced/12.2-1.sql"
#include "udfs/citus_internal_unregister_tenant_schema_globally/12.2-1.sql"
#include "udfs/citus_drop_trigger/12.2-1.sql"
#include "udfs/citus_internal_update_none_dist_table_metadata/12.2-1.sql"
#include "udfs/citus_internal_update_placement_metadata/12.2-1.sql"
#include "udfs/citus_internal_update_relation_colocation/12.2-1.sql"
#include "udfs/repl_origin_helper/12.2-1.sql"

View File

@ -35,3 +35,19 @@ DROP FUNCTION citus_internal.delete_partition_metadata(regclass);
DROP FUNCTION citus_internal.delete_placement_metadata(bigint);
DROP FUNCTION citus_internal.delete_shard_metadata(bigint);
DROP FUNCTION citus_internal.delete_tenant_schema(oid);
DROP FUNCTION citus_internal.local_blocked_processes();
#include "../udfs/citus_blocking_pids/11.0-1.sql"
#include "../udfs/citus_isolation_test_session_is_blocked/11.1-1.sql"
DROP VIEW IF EXISTS pg_catalog.citus_lock_waits;
#include "../udfs/citus_lock_waits/11.0-1.sql"
DROP FUNCTION citus_internal.global_blocked_processes();
DROP FUNCTION citus_internal.mark_node_not_synced(int, int);
DROP FUNCTION citus_internal.unregister_tenant_schema_globally(oid, text);
#include "../udfs/citus_drop_trigger/12.0-1.sql"
DROP FUNCTION citus_internal.update_none_dist_table_metadata(oid, "char", bigint, boolean);
DROP FUNCTION citus_internal.update_placement_metadata(bigint, integer, integer);
DROP FUNCTION citus_internal.update_relation_colocation(oid, int);
DROP FUNCTION citus_internal.start_replication_origin_tracking();
DROP FUNCTION citus_internal.stop_replication_origin_tracking();
DROP FUNCTION citus_internal.is_replication_origin_tracking_active();

View File

@ -0,0 +1,34 @@
DROP FUNCTION pg_catalog.citus_blocking_pids;
CREATE FUNCTION pg_catalog.citus_blocking_pids(pBlockedPid integer)
RETURNS int4[] AS $$
DECLARE
mLocalBlockingPids int4[];
mRemoteBlockingPids int4[];
mLocalGlobalPid int8;
BEGIN
SELECT pg_catalog.old_pg_blocking_pids(pBlockedPid) INTO mLocalBlockingPids;
IF (array_length(mLocalBlockingPids, 1) > 0) THEN
RETURN mLocalBlockingPids;
END IF;
-- pg says we're not blocked locally; check whether we're blocked globally.
SELECT global_pid INTO mLocalGlobalPid
FROM get_all_active_transactions() WHERE process_id = pBlockedPid;
SELECT array_agg(global_pid) INTO mRemoteBlockingPids FROM (
WITH activeTransactions AS (
SELECT global_pid FROM get_all_active_transactions()
), blockingTransactions AS (
SELECT blocking_global_pid FROM citus_internal.global_blocked_processes()
WHERE waiting_global_pid = mLocalGlobalPid
)
SELECT activeTransactions.global_pid FROM activeTransactions, blockingTransactions
WHERE activeTransactions.global_pid = blockingTransactions.blocking_global_pid
) AS sub;
RETURN mRemoteBlockingPids;
END;
$$ LANGUAGE plpgsql;
REVOKE ALL ON FUNCTION citus_blocking_pids(integer) FROM PUBLIC;

View File

@ -20,7 +20,7 @@ RETURNS int4[] AS $$
WITH activeTransactions AS (
SELECT global_pid FROM get_all_active_transactions()
), blockingTransactions AS (
SELECT blocking_global_pid FROM citus_internal_global_blocked_processes()
SELECT blocking_global_pid FROM citus_internal.global_blocked_processes()
WHERE waiting_global_pid = mLocalGlobalPid
)
SELECT activeTransactions.global_pid FROM activeTransactions, blockingTransactions

View File

@ -0,0 +1,68 @@
CREATE OR REPLACE FUNCTION pg_catalog.citus_drop_trigger()
RETURNS event_trigger
LANGUAGE plpgsql
SET search_path = pg_catalog
AS $cdbdt$
DECLARE
constraint_event_count INTEGER;
v_obj record;
dropped_table_is_a_partition boolean := false;
BEGIN
FOR v_obj IN SELECT * FROM pg_event_trigger_dropped_objects()
WHERE object_type IN ('table', 'foreign table')
LOOP
-- first drop the table and metadata on the workers
-- then drop all the shards on the workers
-- finally remove the pg_dist_partition entry on the coordinator
PERFORM master_remove_distributed_table_metadata_from_workers(v_obj.objid, v_obj.schema_name, v_obj.object_name);
-- If both original and normal values are false, the dropped table was a partition
-- that was dropped as a result of its parent being dropped
-- NOTE: the other way around is not true:
-- the table being a partition doesn't imply both original and normal values are false
SELECT (v_obj.original = false AND v_obj.normal = false) INTO dropped_table_is_a_partition;
-- The partition's shards will be dropped when dropping the parent's shards, so we can skip:
-- i.e. we call citus_drop_all_shards with drop_shards_metadata_only parameter set to true
IF dropped_table_is_a_partition
THEN
PERFORM citus_drop_all_shards(v_obj.objid, v_obj.schema_name, v_obj.object_name, drop_shards_metadata_only := true);
ELSE
PERFORM citus_drop_all_shards(v_obj.objid, v_obj.schema_name, v_obj.object_name, drop_shards_metadata_only := false);
END IF;
PERFORM master_remove_partition_metadata(v_obj.objid, v_obj.schema_name, v_obj.object_name);
END LOOP;
FOR v_obj IN SELECT * FROM pg_event_trigger_dropped_objects()
LOOP
-- Remove entries from pg_catalog.pg_dist_schema for all dropped tenant schemas.
-- Also delete the corresponding colocation group from pg_catalog.pg_dist_colocation.
--
-- Although normally we automatically delete the colocation groups when they become empty,
-- we don't do so for the colocation groups that are created for tenant schemas. For this
-- reason, here we need to delete the colocation group when the tenant schema is dropped.
IF v_obj.object_type = 'schema' AND EXISTS (SELECT 1 FROM pg_catalog.pg_dist_schema WHERE schemaid = v_obj.objid)
THEN
PERFORM citus_internal.unregister_tenant_schema_globally(v_obj.objid, v_obj.object_name);
END IF;
-- remove entries from citus.pg_dist_object for all dropped root (objsubid = 0) objects
PERFORM master_unmark_object_distributed(v_obj.classid, v_obj.objid, v_obj.objsubid);
END LOOP;
SELECT COUNT(*) INTO constraint_event_count
FROM pg_event_trigger_dropped_objects()
WHERE object_type IN ('table constraint');
IF constraint_event_count > 0
THEN
-- Tell utility hook that a table constraint is dropped so we might
-- need to undistribute some of the citus local tables that are not
-- connected to any reference tables.
PERFORM notify_constraint_dropped();
END IF;
END;
$cdbdt$;
COMMENT ON FUNCTION pg_catalog.citus_drop_trigger()
IS 'perform checks and actions at the end of DROP actions';

View File

@ -44,7 +44,7 @@ BEGIN
-- reason, here we need to delete the colocation group when the tenant schema is dropped.
IF v_obj.object_type = 'schema' AND EXISTS (SELECT 1 FROM pg_catalog.pg_dist_schema WHERE schemaid = v_obj.objid)
THEN
PERFORM pg_catalog.citus_internal_unregister_tenant_schema_globally(v_obj.objid, v_obj.object_name);
PERFORM citus_internal.unregister_tenant_schema_globally(v_obj.objid, v_obj.object_name);
END IF;
-- remove entries from citus.pg_dist_object for all dropped root (objsubid = 0) objects

View File

@ -0,0 +1,35 @@
CREATE OR REPLACE FUNCTION citus_internal.global_blocked_processes(
OUT waiting_global_pid int8,
OUT waiting_pid int4,
OUT waiting_node_id int4,
OUT waiting_transaction_num int8,
OUT waiting_transaction_stamp timestamptz,
OUT blocking_global_pid int8,
OUT blocking_pid int4,
OUT blocking_node_id int4,
OUT blocking_transaction_num int8,
OUT blocking_transaction_stamp timestamptz,
OUT blocking_transaction_waiting bool)
RETURNS SETOF RECORD
LANGUAGE C STRICT
AS $$MODULE_PATHNAME$$, $$citus_internal_global_blocked_processes$$;
COMMENT ON FUNCTION citus_internal.global_blocked_processes()
IS 'returns a global list of blocked backends originating from this node';
CREATE OR REPLACE FUNCTION pg_catalog.citus_internal_global_blocked_processes(
OUT waiting_global_pid int8,
OUT waiting_pid int4,
OUT waiting_node_id int4,
OUT waiting_transaction_num int8,
OUT waiting_transaction_stamp timestamptz,
OUT blocking_global_pid int8,
OUT blocking_pid int4,
OUT blocking_node_id int4,
OUT blocking_transaction_num int8,
OUT blocking_transaction_stamp timestamptz,
OUT blocking_transaction_waiting bool)
RETURNS SETOF RECORD
LANGUAGE C STRICT
AS $$MODULE_PATHNAME$$, $$citus_internal_global_blocked_processes$$;
COMMENT ON FUNCTION pg_catalog.citus_internal_global_blocked_processes()
IS 'returns a global list of blocked backends originating from this node';

View File

@ -1,3 +1,21 @@
CREATE OR REPLACE FUNCTION citus_internal.global_blocked_processes(
OUT waiting_global_pid int8,
OUT waiting_pid int4,
OUT waiting_node_id int4,
OUT waiting_transaction_num int8,
OUT waiting_transaction_stamp timestamptz,
OUT blocking_global_pid int8,
OUT blocking_pid int4,
OUT blocking_node_id int4,
OUT blocking_transaction_num int8,
OUT blocking_transaction_stamp timestamptz,
OUT blocking_transaction_waiting bool)
RETURNS SETOF RECORD
LANGUAGE C STRICT
AS $$MODULE_PATHNAME$$, $$citus_internal_global_blocked_processes$$;
COMMENT ON FUNCTION citus_internal.global_blocked_processes()
IS 'returns a global list of blocked backends originating from this node';
CREATE OR REPLACE FUNCTION pg_catalog.citus_internal_global_blocked_processes(
OUT waiting_global_pid int8,
OUT waiting_pid int4,

View File

@ -0,0 +1,35 @@
CREATE OR REPLACE FUNCTION citus_internal.local_blocked_processes(
OUT waiting_global_pid int8,
OUT waiting_pid int4,
OUT waiting_node_id int4,
OUT waiting_transaction_num int8,
OUT waiting_transaction_stamp timestamptz,
OUT blocking_global_pid int8,
OUT blocking_pid int4,
OUT blocking_node_id int4,
OUT blocking_transaction_num int8,
OUT blocking_transaction_stamp timestamptz,
OUT blocking_transaction_waiting bool)
RETURNS SETOF RECORD
LANGUAGE C STRICT
AS $$MODULE_PATHNAME$$, $$citus_internal_local_blocked_processes$$;
COMMENT ON FUNCTION citus_internal.local_blocked_processes()
IS 'returns all local lock wait chains, that start from any citus backend';
CREATE OR REPLACE FUNCTION pg_catalog.citus_internal_local_blocked_processes(
OUT waiting_global_pid int8,
OUT waiting_pid int4,
OUT waiting_node_id int4,
OUT waiting_transaction_num int8,
OUT waiting_transaction_stamp timestamptz,
OUT blocking_global_pid int8,
OUT blocking_pid int4,
OUT blocking_node_id int4,
OUT blocking_transaction_num int8,
OUT blocking_transaction_stamp timestamptz,
OUT blocking_transaction_waiting bool)
RETURNS SETOF RECORD
LANGUAGE C STRICT
AS $$MODULE_PATHNAME$$, $$citus_internal_local_blocked_processes$$;
COMMENT ON FUNCTION pg_catalog.citus_internal_local_blocked_processes()
IS 'returns all local lock wait chains, that start from any citus backend';

View File

@ -1,3 +1,21 @@
CREATE OR REPLACE FUNCTION citus_internal.local_blocked_processes(
OUT waiting_global_pid int8,
OUT waiting_pid int4,
OUT waiting_node_id int4,
OUT waiting_transaction_num int8,
OUT waiting_transaction_stamp timestamptz,
OUT blocking_global_pid int8,
OUT blocking_pid int4,
OUT blocking_node_id int4,
OUT blocking_transaction_num int8,
OUT blocking_transaction_stamp timestamptz,
OUT blocking_transaction_waiting bool)
RETURNS SETOF RECORD
LANGUAGE C STRICT
AS $$MODULE_PATHNAME$$, $$citus_internal_local_blocked_processes$$;
COMMENT ON FUNCTION citus_internal.local_blocked_processes()
IS 'returns all local lock wait chains, that start from any citus backend';
CREATE OR REPLACE FUNCTION pg_catalog.citus_internal_local_blocked_processes(
OUT waiting_global_pid int8,
OUT waiting_pid int4,

View File

@ -0,0 +1,13 @@
CREATE OR REPLACE FUNCTION citus_internal.mark_node_not_synced(parent_pid int, nodeid int)
RETURNS VOID
LANGUAGE C STRICT
AS 'MODULE_PATHNAME', $$citus_internal_mark_node_not_synced$$;
COMMENT ON FUNCTION citus_internal.mark_node_not_synced(int, int)
IS 'marks given node not synced by unsetting metadatasynced column at the start of the nontransactional sync.';
CREATE OR REPLACE FUNCTION pg_catalog.citus_internal_mark_node_not_synced(parent_pid int, nodeid int)
RETURNS VOID
LANGUAGE C STRICT
AS 'MODULE_PATHNAME', $$citus_internal_mark_node_not_synced$$;
COMMENT ON FUNCTION citus_internal_mark_node_not_synced(int, int)
IS 'marks given node not synced by unsetting metadatasynced column at the start of the nontransactional sync.';

View File

@ -1,3 +1,10 @@
CREATE OR REPLACE FUNCTION citus_internal.mark_node_not_synced(parent_pid int, nodeid int)
RETURNS VOID
LANGUAGE C STRICT
AS 'MODULE_PATHNAME', $$citus_internal_mark_node_not_synced$$;
COMMENT ON FUNCTION citus_internal.mark_node_not_synced(int, int)
IS 'marks given node not synced by unsetting metadatasynced column at the start of the nontransactional sync.';
CREATE OR REPLACE FUNCTION pg_catalog.citus_internal_mark_node_not_synced(parent_pid int, nodeid int)
RETURNS VOID
LANGUAGE C STRICT

View File

@ -0,0 +1,15 @@
CREATE OR REPLACE FUNCTION citus_internal.unregister_tenant_schema_globally(schema_id Oid, schema_name text)
RETURNS void
LANGUAGE C
VOLATILE
AS 'MODULE_PATHNAME', $$citus_internal_unregister_tenant_schema_globally$$;
COMMENT ON FUNCTION citus_internal.unregister_tenant_schema_globally(schema_id Oid, schema_name text) IS
'Delete a tenant schema and the corresponding colocation group from metadata tables.';
CREATE OR REPLACE FUNCTION pg_catalog.citus_internal_unregister_tenant_schema_globally(schema_id Oid, schema_name text)
RETURNS void
LANGUAGE C
VOLATILE
AS 'MODULE_PATHNAME';
COMMENT ON FUNCTION pg_catalog.citus_internal_unregister_tenant_schema_globally(schema_id Oid, schema_name text) IS
'Delete a tenant schema and the corresponding colocation group from metadata tables.';

View File

@ -1,3 +1,11 @@
CREATE OR REPLACE FUNCTION citus_internal.unregister_tenant_schema_globally(schema_id Oid, schema_name text)
RETURNS void
LANGUAGE C
VOLATILE
AS 'MODULE_PATHNAME', $$citus_internal_unregister_tenant_schema_globally$$;
COMMENT ON FUNCTION citus_internal.unregister_tenant_schema_globally(schema_id Oid, schema_name text) IS
'Delete a tenant schema and the corresponding colocation group from metadata tables.';
CREATE OR REPLACE FUNCTION pg_catalog.citus_internal_unregister_tenant_schema_globally(schema_id Oid, schema_name text)
RETURNS void
LANGUAGE C

View File

@ -0,0 +1,23 @@
CREATE OR REPLACE FUNCTION citus_internal.update_none_dist_table_metadata(
relation_id oid,
replication_model "char",
colocation_id bigint,
auto_converted boolean)
RETURNS void
LANGUAGE C
VOLATILE
AS 'MODULE_PATHNAME', $$citus_internal_update_none_dist_table_metadata$$;
COMMENT ON FUNCTION citus_internal.update_none_dist_table_metadata(oid, "char", bigint, boolean)
IS 'Update pg_dist_partition metadata table for given none-distributed table, to convert it to another type of none-distributed table.';
CREATE OR REPLACE FUNCTION pg_catalog.citus_internal_update_none_dist_table_metadata(
relation_id oid,
replication_model "char",
colocation_id bigint,
auto_converted boolean)
RETURNS void
LANGUAGE C
VOLATILE
AS 'MODULE_PATHNAME';
COMMENT ON FUNCTION pg_catalog.citus_internal_update_none_dist_table_metadata(oid, "char", bigint, boolean)
IS 'Update pg_dist_partition metadata table for given none-distributed table, to convert it to another type of none-distributed table.';

View File

@ -1,3 +1,15 @@
CREATE OR REPLACE FUNCTION citus_internal.update_none_dist_table_metadata(
relation_id oid,
replication_model "char",
colocation_id bigint,
auto_converted boolean)
RETURNS void
LANGUAGE C
VOLATILE
AS 'MODULE_PATHNAME', $$citus_internal_update_none_dist_table_metadata$$;
COMMENT ON FUNCTION citus_internal.update_none_dist_table_metadata(oid, "char", bigint, boolean)
IS 'Update pg_dist_partition metadata table for given none-distributed table, to convert it to another type of none-distributed table.';
CREATE OR REPLACE FUNCTION pg_catalog.citus_internal_update_none_dist_table_metadata(
relation_id oid,
replication_model "char",

View File

@ -0,0 +1,19 @@
CREATE OR REPLACE FUNCTION citus_internal.update_placement_metadata(
shard_id bigint, source_group_id integer,
target_group_id integer)
RETURNS void
LANGUAGE C STRICT
AS 'MODULE_PATHNAME', $$citus_internal_update_placement_metadata$$;
COMMENT ON FUNCTION citus_internal.update_placement_metadata(bigint, integer, integer) IS
'Updates into pg_dist_placement with user checks';
CREATE OR REPLACE FUNCTION pg_catalog.citus_internal_update_placement_metadata(
shard_id bigint, source_group_id integer,
target_group_id integer)
RETURNS void
LANGUAGE C STRICT
AS 'MODULE_PATHNAME';
COMMENT ON FUNCTION pg_catalog.citus_internal_update_placement_metadata(bigint, integer, integer) IS
'Updates into pg_dist_placement with user checks';

View File

@ -1,3 +1,13 @@
CREATE OR REPLACE FUNCTION citus_internal.update_placement_metadata(
shard_id bigint, source_group_id integer,
target_group_id integer)
RETURNS void
LANGUAGE C STRICT
AS 'MODULE_PATHNAME', $$citus_internal_update_placement_metadata$$;
COMMENT ON FUNCTION citus_internal.update_placement_metadata(bigint, integer, integer) IS
'Updates into pg_dist_placement with user checks';
CREATE OR REPLACE FUNCTION pg_catalog.citus_internal_update_placement_metadata(
shard_id bigint, source_group_id integer,
target_group_id integer)

View File

@ -0,0 +1,14 @@
CREATE OR REPLACE FUNCTION citus_internal.update_relation_colocation(relation_id Oid, target_colocation_id int)
RETURNS void
LANGUAGE C STRICT
AS 'MODULE_PATHNAME', $$citus_internal_update_relation_colocation$$;
COMMENT ON FUNCTION citus_internal.update_relation_colocation(oid, int) IS
'Updates colocationId field of pg_dist_partition for the relation_id';
CREATE OR REPLACE FUNCTION pg_catalog.citus_internal_update_relation_colocation(relation_id Oid, target_colocation_id int)
RETURNS void
LANGUAGE C STRICT
AS 'MODULE_PATHNAME';
COMMENT ON FUNCTION pg_catalog.citus_internal_update_relation_colocation(oid, int) IS
'Updates colocationId field of pg_dist_partition for the relation_id';

View File

@ -1,3 +1,10 @@
CREATE OR REPLACE FUNCTION citus_internal.update_relation_colocation(relation_id Oid, target_colocation_id int)
RETURNS void
LANGUAGE C STRICT
AS 'MODULE_PATHNAME', $$citus_internal_update_relation_colocation$$;
COMMENT ON FUNCTION citus_internal.update_relation_colocation(oid, int) IS
'Updates colocationId field of pg_dist_partition for the relation_id';
CREATE OR REPLACE FUNCTION pg_catalog.citus_internal_update_relation_colocation(relation_id Oid, target_colocation_id int)
RETURNS void
LANGUAGE C STRICT

View File

@ -0,0 +1,45 @@
CREATE OR REPLACE FUNCTION pg_catalog.citus_isolation_test_session_is_blocked(pBlockedPid integer, pInterestingPids integer[])
RETURNS boolean AS $$
DECLARE
mBlockedGlobalPid int8;
workerProcessId integer := current_setting('citus.isolation_test_session_remote_process_id');
coordinatorProcessId integer := current_setting('citus.isolation_test_session_process_id');
BEGIN
IF pg_catalog.old_pg_isolation_test_session_is_blocked(pBlockedPid, pInterestingPids) THEN
RETURN true;
END IF;
-- pg says we're not blocked locally; check whether we're blocked globally.
-- Note that worker process may be blocked or waiting for a lock. So we need to
-- get transaction number for both of them. Following IF provides the transaction
-- number when the worker process waiting for other session.
IF EXISTS (SELECT 1 FROM get_global_active_transactions()
WHERE process_id = workerProcessId AND pBlockedPid = coordinatorProcessId) THEN
SELECT global_pid INTO mBlockedGlobalPid FROM get_global_active_transactions()
WHERE process_id = workerProcessId AND pBlockedPid = coordinatorProcessId;
ELSE
-- Check whether transactions initiated from the coordinator get locked
SELECT global_pid INTO mBlockedGlobalPid
FROM get_all_active_transactions() WHERE process_id = pBlockedPid;
END IF;
-- We convert the blocking_global_pid to a regular pid and only look at
-- blocks caused by the interesting pids, or the workerProcessPid. If we
-- don't do that we might find unrelated blocks caused by some random
-- other processes that are not involved in this isolation test. Because we
-- run our isolation tests on a single physical machine, the PID part of
-- the GPID is known to be unique within the whole cluster.
RETURN EXISTS (
SELECT 1 FROM citus_internal.global_blocked_processes()
WHERE waiting_global_pid = mBlockedGlobalPid
AND (
citus_pid_for_gpid(blocking_global_pid) in (
select * from unnest(pInterestingPids)
)
OR citus_pid_for_gpid(blocking_global_pid) = workerProcessId
)
);
END;
$$ LANGUAGE plpgsql;
REVOKE ALL ON FUNCTION citus_isolation_test_session_is_blocked(integer,integer[]) FROM PUBLIC;

View File

@ -30,7 +30,7 @@ RETURNS boolean AS $$
-- run our isolation tests on a single physical machine, the PID part of
-- the GPID is known to be unique within the whole cluster.
RETURN EXISTS (
SELECT 1 FROM citus_internal_global_blocked_processes()
SELECT 1 FROM citus_internal.global_blocked_processes()
WHERE waiting_global_pid = mBlockedGlobalPid
AND (
citus_pid_for_gpid(blocking_global_pid) in (

View File

@ -0,0 +1,47 @@
SET search_path = 'pg_catalog';
CREATE VIEW citus.citus_lock_waits AS
WITH
unique_global_wait_edges_with_calculated_gpids AS (
SELECT
-- if global_pid is NULL, it is most likely that a backend is blocked on a DDL
-- also for legacy reasons citus_internal.global_blocked_processes() returns groupId, we replace that with nodeIds
case WHEN waiting_global_pid !=0 THEN waiting_global_pid ELSE citus_calculate_gpid(get_nodeid_for_groupid(waiting_node_id), waiting_pid) END waiting_global_pid,
case WHEN blocking_global_pid !=0 THEN blocking_global_pid ELSE citus_calculate_gpid(get_nodeid_for_groupid(blocking_node_id), blocking_pid) END blocking_global_pid,
-- citus_internal.global_blocked_processes returns groupId, we replace it here with actual
-- nodeId to be consisten with the other views
get_nodeid_for_groupid(blocking_node_id) as blocking_node_id,
get_nodeid_for_groupid(waiting_node_id) as waiting_node_id,
blocking_transaction_waiting
FROM citus_internal.global_blocked_processes()
),
unique_global_wait_edges AS
(
SELECT DISTINCT ON(waiting_global_pid, blocking_global_pid) * FROM unique_global_wait_edges_with_calculated_gpids
),
citus_dist_stat_activity_with_calculated_gpids AS
(
-- if global_pid is NULL, it is most likely that a backend is blocked on a DDL
SELECT CASE WHEN global_pid != 0 THEN global_pid ELSE citus_calculate_gpid(nodeid, pid) END global_pid, nodeid, pid, query FROM citus_dist_stat_activity
)
SELECT
waiting.global_pid as waiting_gpid,
blocking.global_pid as blocking_gpid,
waiting.query AS blocked_statement,
blocking.query AS current_statement_in_blocking_process,
waiting.nodeid AS waiting_nodeid,
blocking.nodeid AS blocking_nodeid
FROM
unique_global_wait_edges
JOIN
citus_dist_stat_activity_with_calculated_gpids waiting ON (unique_global_wait_edges.waiting_global_pid = waiting.global_pid)
JOIN
citus_dist_stat_activity_with_calculated_gpids blocking ON (unique_global_wait_edges.blocking_global_pid = blocking.global_pid);
ALTER VIEW citus.citus_lock_waits SET SCHEMA pg_catalog;
GRANT SELECT ON pg_catalog.citus_lock_waits TO PUBLIC;
RESET search_path;

View File

@ -5,18 +5,18 @@ WITH
unique_global_wait_edges_with_calculated_gpids AS (
SELECT
-- if global_pid is NULL, it is most likely that a backend is blocked on a DDL
-- also for legacy reasons citus_internal_global_blocked_processes() returns groupId, we replace that with nodeIds
-- also for legacy reasons citus_internal.global_blocked_processes() returns groupId, we replace that with nodeIds
case WHEN waiting_global_pid !=0 THEN waiting_global_pid ELSE citus_calculate_gpid(get_nodeid_for_groupid(waiting_node_id), waiting_pid) END waiting_global_pid,
case WHEN blocking_global_pid !=0 THEN blocking_global_pid ELSE citus_calculate_gpid(get_nodeid_for_groupid(blocking_node_id), blocking_pid) END blocking_global_pid,
-- citus_internal_global_blocked_processes returns groupId, we replace it here with actual
-- citus_internal.global_blocked_processes returns groupId, we replace it here with actual
-- nodeId to be consisten with the other views
get_nodeid_for_groupid(blocking_node_id) as blocking_node_id,
get_nodeid_for_groupid(waiting_node_id) as waiting_node_id,
blocking_transaction_waiting
FROM citus_internal_global_blocked_processes()
FROM citus_internal.global_blocked_processes()
),
unique_global_wait_edges AS
(

View File

@ -0,0 +1,41 @@
CREATE OR REPLACE FUNCTION citus_internal.start_replication_origin_tracking()
RETURNS void
LANGUAGE C STRICT
AS 'MODULE_PATHNAME', $$citus_internal_start_replication_origin_tracking$$;
COMMENT ON FUNCTION citus_internal.start_replication_origin_tracking()
IS 'To start replication origin tracking for skipping publishing of duplicated events during internal data movements for CDC';
CREATE OR REPLACE FUNCTION citus_internal.stop_replication_origin_tracking()
RETURNS void
LANGUAGE C STRICT
AS 'MODULE_PATHNAME', $$citus_internal_stop_replication_origin_tracking$$;
COMMENT ON FUNCTION citus_internal.stop_replication_origin_tracking()
IS 'To stop replication origin tracking for skipping publishing of duplicated events during internal data movements for CDC';
CREATE OR REPLACE FUNCTION citus_internal.is_replication_origin_tracking_active()
RETURNS boolean
LANGUAGE C STRICT
AS 'MODULE_PATHNAME', $$citus_internal_is_replication_origin_tracking_active$$;
COMMENT ON FUNCTION citus_internal.is_replication_origin_tracking_active()
IS 'To check if replication origin tracking is active for skipping publishing of duplicated events during internal data movements for CDC';
CREATE OR REPLACE FUNCTION pg_catalog.citus_internal_start_replication_origin_tracking()
RETURNS void
LANGUAGE C STRICT
AS 'MODULE_PATHNAME', $$citus_internal_start_replication_origin_tracking$$;
COMMENT ON FUNCTION pg_catalog.citus_internal_start_replication_origin_tracking()
IS 'To start replication origin tracking for skipping publishing of duplicated events during internal data movements for CDC';
CREATE OR REPLACE FUNCTION pg_catalog.citus_internal_stop_replication_origin_tracking()
RETURNS void
LANGUAGE C STRICT
AS 'MODULE_PATHNAME', $$citus_internal_stop_replication_origin_tracking$$;
COMMENT ON FUNCTION pg_catalog.citus_internal_stop_replication_origin_tracking()
IS 'To stop replication origin tracking for skipping publishing of duplicated events during internal data movements for CDC';
CREATE OR REPLACE FUNCTION pg_catalog.citus_internal_is_replication_origin_tracking_active()
RETURNS boolean
LANGUAGE C STRICT
AS 'MODULE_PATHNAME', $$citus_internal_is_replication_origin_tracking_active$$;
COMMENT ON FUNCTION pg_catalog.citus_internal_is_replication_origin_tracking_active()
IS 'To check if replication origin tracking is active for skipping publishing of duplicated events during internal data movements for CDC';

View File

@ -1,3 +1,24 @@
CREATE OR REPLACE FUNCTION citus_internal.start_replication_origin_tracking()
RETURNS void
LANGUAGE C STRICT
AS 'MODULE_PATHNAME', $$citus_internal_start_replication_origin_tracking$$;
COMMENT ON FUNCTION citus_internal.start_replication_origin_tracking()
IS 'To start replication origin tracking for skipping publishing of duplicated events during internal data movements for CDC';
CREATE OR REPLACE FUNCTION citus_internal.stop_replication_origin_tracking()
RETURNS void
LANGUAGE C STRICT
AS 'MODULE_PATHNAME', $$citus_internal_stop_replication_origin_tracking$$;
COMMENT ON FUNCTION citus_internal.stop_replication_origin_tracking()
IS 'To stop replication origin tracking for skipping publishing of duplicated events during internal data movements for CDC';
CREATE OR REPLACE FUNCTION citus_internal.is_replication_origin_tracking_active()
RETURNS boolean
LANGUAGE C STRICT
AS 'MODULE_PATHNAME', $$citus_internal_is_replication_origin_tracking_active$$;
COMMENT ON FUNCTION citus_internal.is_replication_origin_tracking_active()
IS 'To check if replication origin tracking is active for skipping publishing of duplicated events during internal data movements for CDC';
CREATE OR REPLACE FUNCTION pg_catalog.citus_internal_start_replication_origin_tracking()
RETURNS void
LANGUAGE C STRICT

View File

@ -192,7 +192,7 @@ BuildGlobalWaitGraph(bool onlyDistributedTx)
"waiting_node_id, waiting_transaction_num, waiting_transaction_stamp, "
"blocking_global_pid,blocking_pid, blocking_node_id, "
"blocking_transaction_num, blocking_transaction_stamp, blocking_transaction_waiting "
"FROM citus_internal_local_blocked_processes()");
"FROM citus_internal.local_blocked_processes()");
}
int querySent = SendRemoteCommand(connection, queryString->data);
@ -226,7 +226,7 @@ BuildGlobalWaitGraph(bool onlyDistributedTx)
else if (!onlyDistributedTx && colCount != 11)
{
ereport(WARNING, (errmsg("unexpected number of columns from "
"citus_internal_local_blocked_processes")));
"citus_internal.local_blocked_processes")));
continue;
}

View File

@ -186,7 +186,7 @@ SetupReplicationOriginRemoteSession(MultiConnection *connection)
{
StringInfo replicationOriginSessionSetupQuery = makeStringInfo();
appendStringInfo(replicationOriginSessionSetupQuery,
"select pg_catalog.citus_internal_start_replication_origin_tracking();");
"select citus_internal.start_replication_origin_tracking();");
ExecuteCriticalRemoteCommand(connection,
replicationOriginSessionSetupQuery->data);
connection->isReplicationOriginSessionSetup = true;
@ -205,7 +205,7 @@ ResetReplicationOriginRemoteSession(MultiConnection *connection)
{
StringInfo replicationOriginSessionResetQuery = makeStringInfo();
appendStringInfo(replicationOriginSessionResetQuery,
"select pg_catalog.citus_internal_stop_replication_origin_tracking();");
"select citus_internal.stop_replication_origin_tracking();");
ExecuteCriticalRemoteCommand(connection,
replicationOriginSessionResetQuery->data);
connection->isReplicationOriginSessionSetup = false;
@ -229,7 +229,7 @@ IsRemoteReplicationOriginSessionSetup(MultiConnection *connection)
StringInfo isReplicationOriginSessionSetupQuery = makeStringInfo();
appendStringInfo(isReplicationOriginSessionSetupQuery,
"SELECT pg_catalog.citus_internal_is_replication_origin_tracking_active()");
"SELECT citus_internal.is_replication_origin_tracking_active()");
bool result =
ExecuteRemoteCommandAndCheckResult(connection,
isReplicationOriginSessionSetupQuery->data,

View File

@ -189,7 +189,7 @@ extern void SendInterTableRelationshipCommands(MetadataSyncContext *context);
#define WORKER_DROP_ALL_SHELL_TABLES \
"CALL pg_catalog.worker_drop_all_shell_tables(%s)"
#define CITUS_INTERNAL_MARK_NODE_NOT_SYNCED \
"SELECT citus_internal_mark_node_not_synced(%d, %d)"
"SELECT citus_internal.mark_node_not_synced(%d, %d)"
#define REMOVE_ALL_CITUS_TABLES_COMMAND \
"SELECT worker_drop_distributed_table(logicalrelid::regclass::text) FROM pg_dist_partition"

View File

@ -369,7 +369,7 @@ ROLLBACK;
\set VERBOSITY DEFAULT
-- Test the UDFs that we use to convert Citus local tables to single-shard tables and
-- reference tables.
SELECT pg_catalog.citus_internal_update_none_dist_table_metadata(1, 't', 1, true);
SELECT citus_internal.update_none_dist_table_metadata(1, 't', 1, true);
ERROR: This is an internal Citus function can only be used in a distributed transaction
SELECT citus_internal.delete_placement_metadata(1);
ERROR: This is an internal Citus function can only be used in a distributed transaction
@ -393,13 +393,13 @@ SET citus.next_shard_id TO 1850000;
SET citus.next_placement_id TO 8510000;
SET citus.shard_replication_factor TO 1;
SET search_path TO create_ref_dist_from_citus_local;
SELECT pg_catalog.citus_internal_update_none_dist_table_metadata(null, 't', 1, true);
SELECT citus_internal.update_none_dist_table_metadata(null, 't', 1, true);
ERROR: relation_id cannot be NULL
SELECT pg_catalog.citus_internal_update_none_dist_table_metadata(1, null, 1, true);
SELECT citus_internal.update_none_dist_table_metadata(1, null, 1, true);
ERROR: replication_model cannot be NULL
SELECT pg_catalog.citus_internal_update_none_dist_table_metadata(1, 't', null, true);
SELECT citus_internal.update_none_dist_table_metadata(1, 't', null, true);
ERROR: colocation_id cannot be NULL
SELECT pg_catalog.citus_internal_update_none_dist_table_metadata(1, 't', 1, null);
SELECT citus_internal.update_none_dist_table_metadata(1, 't', 1, null);
ERROR: auto_converted cannot be NULL
SELECT citus_internal.delete_placement_metadata(null);
ERROR: placement_id cannot be NULL
@ -411,8 +411,8 @@ SELECT citus_add_local_table_to_metadata('udf_test');
(1 row)
BEGIN;
SELECT pg_catalog.citus_internal_update_none_dist_table_metadata('create_ref_dist_from_citus_local.udf_test'::regclass, 'k', 99999, true);
citus_internal_update_none_dist_table_metadata
SELECT citus_internal.update_none_dist_table_metadata('create_ref_dist_from_citus_local.udf_test'::regclass, 'k', 99999, true);
update_none_dist_table_metadata
---------------------------------------------------------------------
(1 row)

View File

@ -138,7 +138,7 @@ step s2-view-worker:
('%pg_prepared_xacts%'),
('%COMMIT%'),
('%dump_local_%'),
('%citus_internal_local_blocked_processes%'),
('%citus_internal.local_blocked_processes%'),
('%add_node%'),
('%csa_from_one_node%'),
('%pg_locks%'))

View File

@ -14,7 +14,7 @@ CREATE TABLE test(col_1 int);
-- not in a distributed transaction
SELECT citus_internal.add_partition_metadata ('test'::regclass, 'h', 'col_1', 0, 's');
ERROR: This is an internal Citus function can only be used in a distributed transaction
SELECT citus_internal_update_relation_colocation ('test'::regclass, 1);
SELECT citus_internal.update_relation_colocation ('test'::regclass, 1);
ERROR: This is an internal Citus function can only be used in a distributed transaction
-- in a distributed transaction, but the application name is not Citus
BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
@ -73,7 +73,7 @@ BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
(1 row)
SET application_name to 'citus_internal gpid=10000000001';
SELECT citus_internal_update_relation_colocation ('test'::regclass, 10);
SELECT citus_internal.update_relation_colocation ('test'::regclass, 10);
ERROR: must be owner of table test
ROLLBACK;
-- finally, a user can only add its own tables to the metadata
@ -349,7 +349,7 @@ BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
SET application_name to 'citus_internal gpid=10000000001';
\set VERBOSITY terse
SELECT citus_internal_update_placement_metadata(1420007, 10000, 11111);
SELECT citus_internal.update_placement_metadata(1420007, 10000, 11111);
ERROR: could not find valid entry for shard xxxxx
ROLLBACK;
-- non-existing users should fail to pass the checks
@ -525,8 +525,8 @@ BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
(1 row)
SET application_name to 'citus_internal gpid=10000000001';
SELECT citus_internal_update_relation_colocation ('test_2'::regclass, 1231231232);
citus_internal_update_relation_colocation
SELECT citus_internal.update_relation_colocation ('test_2'::regclass, 1231231232);
update_relation_colocation
---------------------------------------------------------------------
(1 row)
@ -852,7 +852,7 @@ BEGIN;
(1 row)
SET application_name to 'citus_internal gpid=10000000001';
SELECT citus_internal_update_relation_colocation('test_2'::regclass, 251);
SELECT citus_internal.update_relation_colocation('test_2'::regclass, 251);
ERROR: cannot colocate tables test_2 and test_3
ROLLBACK;
-- now, add few more shards for test_3 to make it colocated with test_2
@ -1112,8 +1112,8 @@ BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
(1 row)
SET application_name to 'citus_internal gpid=10000000001';
SELECT citus_internal_update_relation_colocation('test_2'::regclass, 251);
citus_internal_update_relation_colocation
SELECT citus_internal.update_relation_colocation('test_2'::regclass, 251);
update_relation_colocation
---------------------------------------------------------------------
(1 row)
@ -1130,7 +1130,7 @@ BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
SET application_name to 'citus_internal gpid=10000000001';
\set VERBOSITY terse
SELECT citus_internal_update_placement_metadata(1420000, get_node_id(), get_node_id()+1000);
SELECT citus_internal.update_placement_metadata(1420000, get_node_id(), get_node_id()+1000);
ERROR: Node with group id 1014 for shard placement xxxxx does not exist
COMMIT;
-- fails because the source node doesn't contain the shard
@ -1143,7 +1143,7 @@ BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
SET application_name to 'citus_internal gpid=10000000001';
\set VERBOSITY terse
SELECT citus_internal_update_placement_metadata(1420000, get_node_id()+10000, get_node_id());
SELECT citus_internal.update_placement_metadata(1420000, get_node_id()+10000, get_node_id());
ERROR: Active placement for shard xxxxx is not found on group:14
COMMIT;
-- fails because shard does not exist
@ -1156,7 +1156,7 @@ BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
SET application_name to 'citus_internal gpid=10000000001';
\set VERBOSITY terse
SELECT citus_internal_update_placement_metadata(0, get_node_id(), get_node_id()+1);
SELECT citus_internal.update_placement_metadata(0, get_node_id(), get_node_id()+1);
ERROR: Shard id does not exists: 0
COMMIT;
-- fails because none-existing shard
@ -1169,7 +1169,7 @@ BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
SET application_name to 'citus_internal gpid=10000000001';
\set VERBOSITY terse
SELECT citus_internal_update_placement_metadata(213123123123, get_node_id(), get_node_id()+1);
SELECT citus_internal.update_placement_metadata(213123123123, get_node_id(), get_node_id()+1);
ERROR: Shard id does not exists: 213123123123
COMMIT;
-- fails because we do not own the shard
@ -1182,7 +1182,7 @@ BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
SET application_name to 'citus_internal gpid=10000000001';
\set VERBOSITY terse
SELECT citus_internal_update_placement_metadata(1420007, get_node_id(), get_node_id()+1);
SELECT citus_internal.update_placement_metadata(1420007, get_node_id(), get_node_id()+1);
ERROR: must be owner of table super_user_table
COMMIT;
-- the user only allowed to delete their own shards
@ -1274,7 +1274,7 @@ BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
-- so that making two tables colocated fails
UPDATE pg_dist_partition SET repmodel = 't'
WHERE logicalrelid = 'test_2'::regclass;
SELECT citus_internal_update_relation_colocation('test_2'::regclass, 251);
SELECT citus_internal.update_relation_colocation('test_2'::regclass, 251);
ERROR: cannot colocate tables test_2 and test_3
ROLLBACK;
BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
@ -1298,7 +1298,7 @@ BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
UPDATE pg_dist_partition SET partkey = '{VAR :varno 1 :varattno 1 :vartype 20 :vartypmod -1 :varcollid 0 :varlevelsup 1 :varnoold 1 :varoattno 1 :location -1}'
WHERE logicalrelid = 'test_2'::regclass;
\endif
SELECT citus_internal_update_relation_colocation('test_2'::regclass, 251);
SELECT citus_internal.update_relation_colocation('test_2'::regclass, 251);
ERROR: cannot colocate tables test_2 and test_3
ROLLBACK;
BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
@ -1313,7 +1313,7 @@ BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
-- so that making two tables colocated fails
UPDATE pg_dist_partition SET partmethod = ''
WHERE logicalrelid = 'test_2'::regclass;
SELECT citus_internal_update_relation_colocation('test_2'::regclass, 251);
SELECT citus_internal.update_relation_colocation('test_2'::regclass, 251);
ERROR: The relation "test_2" does not have a valid entry in pg_dist_partition.
ROLLBACK;
BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
@ -1328,7 +1328,7 @@ BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
-- so that making two tables colocated fails
UPDATE pg_dist_partition SET partmethod = 'a'
WHERE logicalrelid = 'test_2'::regclass;
SELECT citus_internal_update_relation_colocation('test_2'::regclass, 251);
SELECT citus_internal.update_relation_colocation('test_2'::regclass, 251);
ERROR: Updating colocation ids are only allowed for hash and single shard distributed tables: a
ROLLBACK;
-- colocated hash distributed table should have the same dist key columns

View File

@ -1438,9 +1438,19 @@ SELECT * FROM multi_extension.print_extension_changes();
| function citus_internal.delete_shard_metadata(bigint) void
| function citus_internal.delete_tenant_schema(oid) void
| function citus_internal.execute_command_on_remote_nodes_as_user(text,text) void
| function citus_internal.global_blocked_processes() SETOF record
| function citus_internal.is_replication_origin_tracking_active() boolean
| function citus_internal.local_blocked_processes() SETOF record
| function citus_internal.mark_node_not_synced(integer,integer) void
| function citus_internal.mark_object_distributed(oid,text,oid,text) void
| function citus_internal.start_management_transaction(xid8) void
(18 rows)
| function citus_internal.start_replication_origin_tracking() void
| function citus_internal.stop_replication_origin_tracking() void
| function citus_internal.unregister_tenant_schema_globally(oid,text) void
| function citus_internal.update_none_dist_table_metadata(oid,"char",bigint,boolean) void
| function citus_internal.update_placement_metadata(bigint,integer,integer) void
| function citus_internal.update_relation_colocation(oid,integer) void
(28 rows)
DROP TABLE multi_extension.prev_objects, multi_extension.extension_diff;
-- show running version

View File

@ -19,13 +19,13 @@ SELECT citus_internal.add_tenant_schema(1, NULL);
ERROR: colocation_id cannot be NULL
SELECT citus_internal.delete_tenant_schema(NULL);
ERROR: schema_id cannot be NULL
SELECT citus_internal_unregister_tenant_schema_globally(1, NULL);
SELECT citus_internal.unregister_tenant_schema_globally(1, NULL);
ERROR: schema_name cannot be NULL
SELECT citus_internal_unregister_tenant_schema_globally(NULL, 'text');
SELECT citus_internal.unregister_tenant_schema_globally(NULL, 'text');
ERROR: schema_id cannot be NULL
-- Verify that citus_internal_unregister_tenant_schema_globally can only
-- Verify that citus_internal.unregister_tenant_schema_globally can only
-- be called on schemas that are dropped already.
SELECT citus_internal_unregister_tenant_schema_globally('regular_schema'::regnamespace, 'regular_schema');
SELECT citus_internal.unregister_tenant_schema_globally('regular_schema'::regnamespace, 'regular_schema');
ERROR: schema is expected to be already dropped because this function is only expected to be called from Citus drop hook
SELECT 1 FROM citus_remove_node('localhost', :worker_2_port);
?column?
@ -1511,10 +1511,10 @@ SELECT pg_reload_conf();
t
(1 row)
-- Verify that citus_internal_unregister_tenant_schema_globally is a no-op
-- Verify that citus_internal.unregister_tenant_schema_globally is a no-op
-- on workers.
SELECT citus_internal_unregister_tenant_schema_globally('tenant_3'::regnamespace, 'tenant_3');
citus_internal_unregister_tenant_schema_globally
SELECT citus_internal.unregister_tenant_schema_globally('tenant_3'::regnamespace, 'tenant_3');
unregister_tenant_schema_globally
---------------------------------------------------------------------
(1 row)

View File

@ -73,6 +73,10 @@ ORDER BY 1;
function citus_internal.delete_tenant_schema(oid)
function citus_internal.execute_command_on_remote_nodes_as_user(text,text)
function citus_internal.find_groupid_for_node(text,integer)
function citus_internal.global_blocked_processes()
function citus_internal.is_replication_origin_tracking_active()
function citus_internal.local_blocked_processes()
function citus_internal.mark_node_not_synced(integer,integer)
function citus_internal.mark_object_distributed(oid,text,oid,text)
function citus_internal.pg_dist_node_trigger_func()
function citus_internal.pg_dist_rebalance_strategy_trigger_func()
@ -81,6 +85,12 @@ ORDER BY 1;
function citus_internal.replace_isolation_tester_func()
function citus_internal.restore_isolation_tester_func()
function citus_internal.start_management_transaction(xid8)
function citus_internal.start_replication_origin_tracking()
function citus_internal.stop_replication_origin_tracking()
function citus_internal.unregister_tenant_schema_globally(oid,text)
function citus_internal.update_none_dist_table_metadata(oid,"char",bigint,boolean)
function citus_internal.update_placement_metadata(bigint,integer,integer)
function citus_internal.update_relation_colocation(oid,integer)
function citus_internal_add_colocation_metadata(integer,integer,integer,regtype,oid)
function citus_internal_add_object_metadata(text,text[],text[],integer,integer,boolean)
function citus_internal_add_partition_metadata(regclass,"char",text,integer,"char")
@ -361,5 +371,5 @@ ORDER BY 1;
view citus_stat_tenants_local
view pg_dist_shard_placement
view time_partitions
(351 rows)
(361 rows)

View File

@ -90,7 +90,7 @@ step "s2-view-worker"
('%pg_prepared_xacts%'),
('%COMMIT%'),
('%dump_local_%'),
('%citus_internal_local_blocked_processes%'),
('%citus_internal.local_blocked_processes%'),
('%add_node%'),
('%csa_from_one_node%'),
('%pg_locks%'))

View File

@ -219,7 +219,7 @@ ROLLBACK;
-- Test the UDFs that we use to convert Citus local tables to single-shard tables and
-- reference tables.
SELECT pg_catalog.citus_internal_update_none_dist_table_metadata(1, 't', 1, true);
SELECT citus_internal.update_none_dist_table_metadata(1, 't', 1, true);
SELECT citus_internal.delete_placement_metadata(1);
CREATE ROLE test_user_create_ref_dist WITH LOGIN;
@ -234,10 +234,10 @@ SET citus.next_placement_id TO 8510000;
SET citus.shard_replication_factor TO 1;
SET search_path TO create_ref_dist_from_citus_local;
SELECT pg_catalog.citus_internal_update_none_dist_table_metadata(null, 't', 1, true);
SELECT pg_catalog.citus_internal_update_none_dist_table_metadata(1, null, 1, true);
SELECT pg_catalog.citus_internal_update_none_dist_table_metadata(1, 't', null, true);
SELECT pg_catalog.citus_internal_update_none_dist_table_metadata(1, 't', 1, null);
SELECT citus_internal.update_none_dist_table_metadata(null, 't', 1, true);
SELECT citus_internal.update_none_dist_table_metadata(1, null, 1, true);
SELECT citus_internal.update_none_dist_table_metadata(1, 't', null, true);
SELECT citus_internal.update_none_dist_table_metadata(1, 't', 1, null);
SELECT citus_internal.delete_placement_metadata(null);
@ -245,7 +245,7 @@ CREATE TABLE udf_test (col_1 int);
SELECT citus_add_local_table_to_metadata('udf_test');
BEGIN;
SELECT pg_catalog.citus_internal_update_none_dist_table_metadata('create_ref_dist_from_citus_local.udf_test'::regclass, 'k', 99999, true);
SELECT citus_internal.update_none_dist_table_metadata('create_ref_dist_from_citus_local.udf_test'::regclass, 'k', 99999, true);
SELECT COUNT(*)=1 FROM pg_dist_partition
WHERE logicalrelid = 'create_ref_dist_from_citus_local.udf_test'::regclass AND repmodel = 'k' AND colocationid = 99999 AND autoconverted = true;

View File

@ -16,7 +16,7 @@ CREATE TABLE test(col_1 int);
-- not in a distributed transaction
SELECT citus_internal.add_partition_metadata ('test'::regclass, 'h', 'col_1', 0, 's');
SELECT citus_internal_update_relation_colocation ('test'::regclass, 1);
SELECT citus_internal.update_relation_colocation ('test'::regclass, 1);
-- in a distributed transaction, but the application name is not Citus
BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
@ -54,7 +54,7 @@ ROLLBACK;
BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02');
SET application_name to 'citus_internal gpid=10000000001';
SELECT citus_internal_update_relation_colocation ('test'::regclass, 10);
SELECT citus_internal.update_relation_colocation ('test'::regclass, 10);
ROLLBACK;
-- finally, a user can only add its own tables to the metadata
@ -202,7 +202,7 @@ BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02');
SET application_name to 'citus_internal gpid=10000000001';
\set VERBOSITY terse
SELECT citus_internal_update_placement_metadata(1420007, 10000, 11111);
SELECT citus_internal.update_placement_metadata(1420007, 10000, 11111);
ROLLBACK;
-- non-existing users should fail to pass the checks
@ -302,7 +302,7 @@ COMMIT;
BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02');
SET application_name to 'citus_internal gpid=10000000001';
SELECT citus_internal_update_relation_colocation ('test_2'::regclass, 1231231232);
SELECT citus_internal.update_relation_colocation ('test_2'::regclass, 1231231232);
ROLLBACK;
-- invalid shard ids are not allowed
@ -525,7 +525,7 @@ COMMIT;
BEGIN;
SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02');
SET application_name to 'citus_internal gpid=10000000001';
SELECT citus_internal_update_relation_colocation('test_2'::regclass, 251);
SELECT citus_internal.update_relation_colocation('test_2'::regclass, 251);
ROLLBACK;
-- now, add few more shards for test_3 to make it colocated with test_2
@ -693,7 +693,7 @@ COMMIT;
BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02');
SET application_name to 'citus_internal gpid=10000000001';
SELECT citus_internal_update_relation_colocation('test_2'::regclass, 251);
SELECT citus_internal.update_relation_colocation('test_2'::regclass, 251);
ROLLBACK;
-- try to update placements
@ -703,7 +703,7 @@ BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02');
SET application_name to 'citus_internal gpid=10000000001';
\set VERBOSITY terse
SELECT citus_internal_update_placement_metadata(1420000, get_node_id(), get_node_id()+1000);
SELECT citus_internal.update_placement_metadata(1420000, get_node_id(), get_node_id()+1000);
COMMIT;
-- fails because the source node doesn't contain the shard
@ -711,7 +711,7 @@ BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02');
SET application_name to 'citus_internal gpid=10000000001';
\set VERBOSITY terse
SELECT citus_internal_update_placement_metadata(1420000, get_node_id()+10000, get_node_id());
SELECT citus_internal.update_placement_metadata(1420000, get_node_id()+10000, get_node_id());
COMMIT;
-- fails because shard does not exist
@ -719,7 +719,7 @@ BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02');
SET application_name to 'citus_internal gpid=10000000001';
\set VERBOSITY terse
SELECT citus_internal_update_placement_metadata(0, get_node_id(), get_node_id()+1);
SELECT citus_internal.update_placement_metadata(0, get_node_id(), get_node_id()+1);
COMMIT;
-- fails because none-existing shard
@ -727,7 +727,7 @@ BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02');
SET application_name to 'citus_internal gpid=10000000001';
\set VERBOSITY terse
SELECT citus_internal_update_placement_metadata(213123123123, get_node_id(), get_node_id()+1);
SELECT citus_internal.update_placement_metadata(213123123123, get_node_id(), get_node_id()+1);
COMMIT;
-- fails because we do not own the shard
@ -735,7 +735,7 @@ BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02');
SET application_name to 'citus_internal gpid=10000000001';
\set VERBOSITY terse
SELECT citus_internal_update_placement_metadata(1420007, get_node_id(), get_node_id()+1);
SELECT citus_internal.update_placement_metadata(1420007, get_node_id(), get_node_id()+1);
COMMIT;
-- the user only allowed to delete their own shards
@ -788,7 +788,7 @@ BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
-- so that making two tables colocated fails
UPDATE pg_dist_partition SET repmodel = 't'
WHERE logicalrelid = 'test_2'::regclass;
SELECT citus_internal_update_relation_colocation('test_2'::regclass, 251);
SELECT citus_internal.update_relation_colocation('test_2'::regclass, 251);
ROLLBACK;
@ -810,7 +810,7 @@ BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
WHERE logicalrelid = 'test_2'::regclass;
\endif
SELECT citus_internal_update_relation_colocation('test_2'::regclass, 251);
SELECT citus_internal.update_relation_colocation('test_2'::regclass, 251);
ROLLBACK;
BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
@ -820,7 +820,7 @@ BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
-- so that making two tables colocated fails
UPDATE pg_dist_partition SET partmethod = ''
WHERE logicalrelid = 'test_2'::regclass;
SELECT citus_internal_update_relation_colocation('test_2'::regclass, 251);
SELECT citus_internal.update_relation_colocation('test_2'::regclass, 251);
ROLLBACK;
BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
@ -830,7 +830,7 @@ BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
-- so that making two tables colocated fails
UPDATE pg_dist_partition SET partmethod = 'a'
WHERE logicalrelid = 'test_2'::regclass;
SELECT citus_internal_update_relation_colocation('test_2'::regclass, 251);
SELECT citus_internal.update_relation_colocation('test_2'::regclass, 251);
ROLLBACK;
-- colocated hash distributed table should have the same dist key columns

View File

@ -15,12 +15,12 @@ SET client_min_messages TO NOTICE;
SELECT citus_internal.add_tenant_schema(NULL, 1);
SELECT citus_internal.add_tenant_schema(1, NULL);
SELECT citus_internal.delete_tenant_schema(NULL);
SELECT citus_internal_unregister_tenant_schema_globally(1, NULL);
SELECT citus_internal_unregister_tenant_schema_globally(NULL, 'text');
SELECT citus_internal.unregister_tenant_schema_globally(1, NULL);
SELECT citus_internal.unregister_tenant_schema_globally(NULL, 'text');
-- Verify that citus_internal_unregister_tenant_schema_globally can only
-- Verify that citus_internal.unregister_tenant_schema_globally can only
-- be called on schemas that are dropped already.
SELECT citus_internal_unregister_tenant_schema_globally('regular_schema'::regnamespace, 'regular_schema');
SELECT citus_internal.unregister_tenant_schema_globally('regular_schema'::regnamespace, 'regular_schema');
SELECT 1 FROM citus_remove_node('localhost', :worker_2_port);
@ -1022,9 +1022,9 @@ SELECT pg_reload_conf();
ALTER SYSTEM SET citus.enable_schema_based_sharding TO ON;
SELECT pg_reload_conf();
-- Verify that citus_internal_unregister_tenant_schema_globally is a no-op
-- Verify that citus_internal.unregister_tenant_schema_globally is a no-op
-- on workers.
SELECT citus_internal_unregister_tenant_schema_globally('tenant_3'::regnamespace, 'tenant_3');
SELECT citus_internal.unregister_tenant_schema_globally('tenant_3'::regnamespace, 'tenant_3');
\c - - - :master_port