Merge pull request #5127 from citusdata/get_ready_tenant_isolation

Introduce citus_internal_delete_shard_metadata
pull/5133/head
Önder Kalacı 2021-07-19 14:43:47 +02:00 committed by GitHub
commit f52db0abab
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 193 additions and 7 deletions

View File

@ -127,6 +127,7 @@ PG_FUNCTION_INFO_V1(citus_internal_add_partition_metadata);
PG_FUNCTION_INFO_V1(citus_internal_add_shard_metadata);
PG_FUNCTION_INFO_V1(citus_internal_add_placement_metadata);
PG_FUNCTION_INFO_V1(citus_internal_update_placement_metadata);
PG_FUNCTION_INFO_V1(citus_internal_delete_shard_metadata);
static bool got_SIGTERM = false;
@ -2255,8 +2256,8 @@ EnsureCoordinatorInitiatedOperation(void)
GetLocalGroupId() == COORDINATOR_GROUP_ID)
{
ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("This is an internal function that only Citus "
"requires to use in a distributed transaction")));
errmsg("This is an internal Citus function can only be "
"used in a distributed transaction")));
}
}
@ -2573,3 +2574,41 @@ citus_internal_update_placement_metadata(PG_FUNCTION_ARGS)
PG_RETURN_VOID();
}
/*
* citus_internal_delete_shard_metadata is an internal UDF to
* delete a row in pg_dist_shard and corresponding placement rows
* from pg_dist_shard_placement.
*/
Datum
citus_internal_delete_shard_metadata(PG_FUNCTION_ARGS)
{
int64 shardId = PG_GETARG_INT64(0);
if (!ShouldSkipMetadataChecks())
{
/* this UDF is not allowed allowed for executing as a separate command */
EnsureCoordinatorInitiatedOperation();
if (!ShardExists(shardId))
{
ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("Shard id does not exists: %ld", shardId)));
}
bool missingOk = false;
EnsureShardOwner(shardId, missingOk);
}
List *shardPlacementList = ShardPlacementListIncludingOrphanedPlacements(shardId);
ShardPlacement *shardPlacement = NULL;
foreach_ptr(shardPlacement, shardPlacementList)
{
DeleteShardPlacementRow(shardPlacement->placementId);
}
DeleteShardRow(shardId);
PG_RETURN_VOID();
}

View File

@ -14,3 +14,4 @@ ALTER TABLE pg_catalog.pg_dist_placement ADD CONSTRAINT placement_shardid_groupi
#include "udfs/citus_internal_add_shard_metadata/10.2-1.sql";
#include "udfs/citus_internal_add_placement_metadata/10.2-1.sql";
#include "udfs/citus_internal_update_placement_metadata/10.2-1.sql";
#include "udfs/citus_internal_delete_shard_metadata/10.2-1.sql";

View File

@ -15,6 +15,7 @@ DROP FUNCTION pg_catalog.citus_internal_add_partition_metadata(regclass, "char",
DROP FUNCTION pg_catalog.citus_internal_add_shard_metadata(regclass, bigint, "char", text, text);
DROP FUNCTION pg_catalog.citus_internal_add_placement_metadata(bigint, integer, bigint, integer, bigint);
DROP FUNCTION pg_catalog.citus_internal_update_placement_metadata(bigint, integer, integer);
DROP FUNCTION pg_catalog.citus_internal_delete_shard_metadata(bigint);
REVOKE ALL ON FUNCTION pg_catalog.worker_record_sequence_dependency(regclass,regclass,name) FROM PUBLIC;
ALTER TABLE pg_catalog.pg_dist_placement DROP CONSTRAINT placement_shardid_groupid_unique_index;

View File

@ -0,0 +1,7 @@
CREATE OR REPLACE FUNCTION pg_catalog.citus_internal_delete_shard_metadata(shard_id bigint)
RETURNS void
LANGUAGE C STRICT
AS 'MODULE_PATHNAME';
COMMENT ON FUNCTION pg_catalog.citus_internal_delete_shard_metadata(bigint) IS
'Deletes rows from pg_dist_shard and pg_dist_shard_placement with user checks';

View File

@ -0,0 +1,7 @@
CREATE OR REPLACE FUNCTION pg_catalog.citus_internal_delete_shard_metadata(shard_id bigint)
RETURNS void
LANGUAGE C STRICT
AS 'MODULE_PATHNAME';
COMMENT ON FUNCTION pg_catalog.citus_internal_delete_shard_metadata(bigint) IS
'Deletes rows from pg_dist_shard and pg_dist_shard_placement with user checks';

View File

@ -13,7 +13,7 @@ SET search_path TO metadata_sync_helpers;
CREATE TABLE test(col_1 int);
-- not in a distributed transaction
SELECT citus_internal_add_partition_metadata ('test'::regclass, 'h', 'col_1', 0, 's');
ERROR: This is an internal function that only Citus requires to use in a distributed transaction
ERROR: This is an internal Citus function can only be used in a distributed transaction
-- in a distributed transaction, but the application name is not Citus
BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02');
@ -23,7 +23,7 @@ BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
(1 row)
SELECT citus_internal_add_partition_metadata ('test'::regclass, 'h', 'col_1', 0, 's');
ERROR: This is an internal function that only Citus requires to use in a distributed transaction
ERROR: This is an internal Citus function can only be used in a distributed transaction
ROLLBACK;
-- in a distributed transaction and the application name is Citus
-- but we are on the coordinator, so still not allowed
@ -36,7 +36,7 @@ BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
SET application_name to 'citus';
SELECT citus_internal_add_partition_metadata ('test'::regclass, 'h', 'col_1', 0, 's');
ERROR: This is an internal function that only Citus requires to use in a distributed transaction
ERROR: This is an internal Citus function can only be used in a distributed transaction
ROLLBACK;
-- connect back as super user, and then connect to the worker
-- with the superuser to make sure we can ingest metadata with
@ -886,6 +886,88 @@ BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
SELECT citus_internal_update_placement_metadata(1420007, get_node_id(), get_node_id()+1);
ERROR: must be owner of table super_user_table
COMMIT;
-- the user only allowed to delete their own shards
BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02');
assign_distributed_transaction_id
---------------------------------------------------------------------
(1 row)
SET application_name to 'citus';
\set VERBOSITY terse
WITH shard_data(shardid)
AS (VALUES (1420007))
SELECT citus_internal_delete_shard_metadata(shardid) FROM shard_data;
ERROR: must be owner of table super_user_table
ROLLBACK;
-- the user only allowed to delete shards in a distributed transaction
BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
SET application_name to 'citus';
\set VERBOSITY terse
WITH shard_data(shardid)
AS (VALUES (1420007))
SELECT citus_internal_delete_shard_metadata(shardid) FROM shard_data;
ERROR: This is an internal Citus function can only be used in a distributed transaction
ROLLBACK;
-- the user cannot delete non-existing shards
BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02');
assign_distributed_transaction_id
---------------------------------------------------------------------
(1 row)
SET application_name to 'citus';
\set VERBOSITY terse
WITH shard_data(shardid)
AS (VALUES (1420100))
SELECT citus_internal_delete_shard_metadata(shardid) FROM shard_data;
ERROR: Shard id does not exists: 1420100
ROLLBACK;
-- sucessfully delete shards
BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
SELECT count(*) FROM pg_dist_shard WHERE shardid = 1420000;
count
---------------------------------------------------------------------
1
(1 row)
SELECT count(*) FROM pg_dist_placement WHERE shardid = 1420000;
count
---------------------------------------------------------------------
1
(1 row)
SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02');
assign_distributed_transaction_id
---------------------------------------------------------------------
(1 row)
SET application_name to 'citus';
\set VERBOSITY terse
WITH shard_data(shardid)
AS (VALUES (1420000))
SELECT citus_internal_delete_shard_metadata(shardid) FROM shard_data;
citus_internal_delete_shard_metadata
---------------------------------------------------------------------
(1 row)
SELECT count(*) FROM pg_dist_shard WHERE shardid = 1420000;
count
---------------------------------------------------------------------
0
(1 row)
SELECT count(*) FROM pg_dist_placement WHERE shardid = 1420000;
count
---------------------------------------------------------------------
0
(1 row)
ROLLBACK;
-- we don't need the table/schema anymore
-- connect back as super user to drop everything
\c - postgres - :worker_1_port

View File

@ -649,9 +649,10 @@ SELECT * FROM multi_extension.print_extension_changes();
| function citus_internal_add_partition_metadata(regclass,"char",text,integer,"char") void
| function citus_internal_add_placement_metadata(bigint,integer,bigint,integer,bigint) void
| function citus_internal_add_shard_metadata(regclass,bigint,"char",text,text) void
| function citus_internal_delete_shard_metadata(bigint) void
| function citus_internal_update_placement_metadata(bigint,integer,integer) void
| function stop_metadata_sync_to_node(text,integer,boolean) void
(8 rows)
(9 rows)
DROP TABLE multi_extension.prev_objects, multi_extension.extension_diff;
-- show running version

View File

@ -69,6 +69,7 @@ ORDER BY 1;
function citus_internal_add_partition_metadata(regclass,"char",text,integer,"char")
function citus_internal_add_placement_metadata(bigint,integer,bigint,integer,bigint)
function citus_internal_add_shard_metadata(regclass,bigint,"char",text,text)
function citus_internal_delete_shard_metadata(bigint)
function citus_internal_update_placement_metadata(bigint,integer,integer)
function citus_isolation_test_session_is_blocked(integer,integer[])
function citus_json_concatenate(json,json)
@ -252,5 +253,5 @@ ORDER BY 1;
view citus_worker_stat_activity
view pg_dist_shard_placement
view time_partitions
(236 rows)
(237 rows)

View File

@ -572,6 +572,53 @@ BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
SELECT citus_internal_update_placement_metadata(1420007, get_node_id(), get_node_id()+1);
COMMIT;
-- the user only allowed to delete their own shards
BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02');
SET application_name to 'citus';
\set VERBOSITY terse
WITH shard_data(shardid)
AS (VALUES (1420007))
SELECT citus_internal_delete_shard_metadata(shardid) FROM shard_data;
ROLLBACK;
-- the user only allowed to delete shards in a distributed transaction
BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
SET application_name to 'citus';
\set VERBOSITY terse
WITH shard_data(shardid)
AS (VALUES (1420007))
SELECT citus_internal_delete_shard_metadata(shardid) FROM shard_data;
ROLLBACK;
-- the user cannot delete non-existing shards
BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02');
SET application_name to 'citus';
\set VERBOSITY terse
WITH shard_data(shardid)
AS (VALUES (1420100))
SELECT citus_internal_delete_shard_metadata(shardid) FROM shard_data;
ROLLBACK;
-- sucessfully delete shards
BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;
SELECT count(*) FROM pg_dist_shard WHERE shardid = 1420000;
SELECT count(*) FROM pg_dist_placement WHERE shardid = 1420000;
SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02');
SET application_name to 'citus';
\set VERBOSITY terse
WITH shard_data(shardid)
AS (VALUES (1420000))
SELECT citus_internal_delete_shard_metadata(shardid) FROM shard_data;
SELECT count(*) FROM pg_dist_shard WHERE shardid = 1420000;
SELECT count(*) FROM pg_dist_placement WHERE shardid = 1420000;
ROLLBACK;
-- we don't need the table/schema anymore
-- connect back as super user to drop everything
\c - postgres - :worker_1_port