diff --git a/src/backend/distributed/metadata/metadata_sync.c b/src/backend/distributed/metadata/metadata_sync.c index cb64c88c7..28e54b33b 100644 --- a/src/backend/distributed/metadata/metadata_sync.c +++ b/src/backend/distributed/metadata/metadata_sync.c @@ -127,6 +127,7 @@ PG_FUNCTION_INFO_V1(citus_internal_add_partition_metadata); PG_FUNCTION_INFO_V1(citus_internal_add_shard_metadata); PG_FUNCTION_INFO_V1(citus_internal_add_placement_metadata); PG_FUNCTION_INFO_V1(citus_internal_update_placement_metadata); +PG_FUNCTION_INFO_V1(citus_internal_delete_shard_metadata); static bool got_SIGTERM = false; @@ -2573,3 +2574,41 @@ citus_internal_update_placement_metadata(PG_FUNCTION_ARGS) PG_RETURN_VOID(); } + + +/* + * citus_internal_delete_shard_metadata is an internal UDF to + * delete a row in pg_dist_shard and corresponding placement rows + * from pg_dist_shard_placement. + */ +Datum +citus_internal_delete_shard_metadata(PG_FUNCTION_ARGS) +{ + int64 shardId = PG_GETARG_INT64(0); + + if (!ShouldSkipMetadataChecks()) + { + /* this UDF is not allowed allowed for executing as a separate command */ + EnsureCoordinatorInitiatedOperation(); + + if (!ShardExists(shardId)) + { + ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("Shard id does not exists: %ld", shardId))); + } + + bool missingOk = false; + EnsureShardOwner(shardId, missingOk); + } + + List *shardPlacementList = ShardPlacementListIncludingOrphanedPlacements(shardId); + ShardPlacement *shardPlacement = NULL; + foreach_ptr(shardPlacement, shardPlacementList) + { + DeleteShardPlacementRow(shardPlacement->placementId); + } + + DeleteShardRow(shardId); + + PG_RETURN_VOID(); +} diff --git a/src/backend/distributed/sql/citus--10.1-1--10.2-1.sql b/src/backend/distributed/sql/citus--10.1-1--10.2-1.sql index d529bf2a3..b56a3296f 100644 --- a/src/backend/distributed/sql/citus--10.1-1--10.2-1.sql +++ b/src/backend/distributed/sql/citus--10.1-1--10.2-1.sql @@ -14,3 +14,4 @@ ALTER TABLE pg_catalog.pg_dist_placement ADD CONSTRAINT placement_shardid_groupi #include "udfs/citus_internal_add_shard_metadata/10.2-1.sql"; #include "udfs/citus_internal_add_placement_metadata/10.2-1.sql"; #include "udfs/citus_internal_update_placement_metadata/10.2-1.sql"; +#include "udfs/citus_internal_delete_shard_metadata/10.2-1.sql"; diff --git a/src/backend/distributed/sql/downgrades/citus--10.2-1--10.1-1.sql b/src/backend/distributed/sql/downgrades/citus--10.2-1--10.1-1.sql index 31b1247d5..afb7bd195 100644 --- a/src/backend/distributed/sql/downgrades/citus--10.2-1--10.1-1.sql +++ b/src/backend/distributed/sql/downgrades/citus--10.2-1--10.1-1.sql @@ -15,6 +15,7 @@ DROP FUNCTION pg_catalog.citus_internal_add_partition_metadata(regclass, "char", DROP FUNCTION pg_catalog.citus_internal_add_shard_metadata(regclass, bigint, "char", text, text); DROP FUNCTION pg_catalog.citus_internal_add_placement_metadata(bigint, integer, bigint, integer, bigint); DROP FUNCTION pg_catalog.citus_internal_update_placement_metadata(bigint, integer, integer); +DROP FUNCTION pg_catalog.citus_internal_delete_shard_metadata(bigint); REVOKE ALL ON FUNCTION pg_catalog.worker_record_sequence_dependency(regclass,regclass,name) FROM PUBLIC; ALTER TABLE pg_catalog.pg_dist_placement DROP CONSTRAINT placement_shardid_groupid_unique_index; diff --git a/src/backend/distributed/sql/udfs/citus_internal_delete_shard_metadata/10.2-1.sql b/src/backend/distributed/sql/udfs/citus_internal_delete_shard_metadata/10.2-1.sql new file mode 100644 index 000000000..7bfd86bdd --- /dev/null +++ b/src/backend/distributed/sql/udfs/citus_internal_delete_shard_metadata/10.2-1.sql @@ -0,0 +1,7 @@ +CREATE OR REPLACE FUNCTION pg_catalog.citus_internal_delete_shard_metadata(shard_id bigint) + RETURNS void + LANGUAGE C STRICT + AS 'MODULE_PATHNAME'; +COMMENT ON FUNCTION pg_catalog.citus_internal_delete_shard_metadata(bigint) IS + 'Deletes rows from pg_dist_shard and pg_dist_shard_placement with user checks'; + diff --git a/src/backend/distributed/sql/udfs/citus_internal_delete_shard_metadata/latest.sql b/src/backend/distributed/sql/udfs/citus_internal_delete_shard_metadata/latest.sql new file mode 100644 index 000000000..7bfd86bdd --- /dev/null +++ b/src/backend/distributed/sql/udfs/citus_internal_delete_shard_metadata/latest.sql @@ -0,0 +1,7 @@ +CREATE OR REPLACE FUNCTION pg_catalog.citus_internal_delete_shard_metadata(shard_id bigint) + RETURNS void + LANGUAGE C STRICT + AS 'MODULE_PATHNAME'; +COMMENT ON FUNCTION pg_catalog.citus_internal_delete_shard_metadata(bigint) IS + 'Deletes rows from pg_dist_shard and pg_dist_shard_placement with user checks'; + diff --git a/src/test/regress/expected/metadata_sync_helpers.out b/src/test/regress/expected/metadata_sync_helpers.out index 4f47f1d29..a22a596c4 100644 --- a/src/test/regress/expected/metadata_sync_helpers.out +++ b/src/test/regress/expected/metadata_sync_helpers.out @@ -886,6 +886,88 @@ BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED; SELECT citus_internal_update_placement_metadata(1420007, get_node_id(), get_node_id()+1); ERROR: must be owner of table super_user_table COMMIT; +-- the user only allowed to delete their own shards +BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED; + SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02'); + assign_distributed_transaction_id +--------------------------------------------------------------------- + +(1 row) + + SET application_name to 'citus'; + \set VERBOSITY terse + WITH shard_data(shardid) + AS (VALUES (1420007)) + SELECT citus_internal_delete_shard_metadata(shardid) FROM shard_data; +ERROR: must be owner of table super_user_table +ROLLBACK; +-- the user only allowed to delete shards in a distributed transaction +BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED; + SET application_name to 'citus'; + \set VERBOSITY terse + WITH shard_data(shardid) + AS (VALUES (1420007)) + SELECT citus_internal_delete_shard_metadata(shardid) FROM shard_data; +ERROR: This is an internal function that only Citus requires to use in a distributed transaction +ROLLBACK; +-- the user cannot delete non-existing shards +BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED; + SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02'); + assign_distributed_transaction_id +--------------------------------------------------------------------- + +(1 row) + + SET application_name to 'citus'; + \set VERBOSITY terse + WITH shard_data(shardid) + AS (VALUES (1420100)) + SELECT citus_internal_delete_shard_metadata(shardid) FROM shard_data; +ERROR: Shard id does not exists: 1420100 +ROLLBACK; +-- sucessfully delete shards +BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED; + SELECT count(*) FROM pg_dist_shard WHERE shardid = 1420000; + count +--------------------------------------------------------------------- + 1 +(1 row) + + SELECT count(*) FROM pg_dist_placement WHERE shardid = 1420000; + count +--------------------------------------------------------------------- + 1 +(1 row) + + SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02'); + assign_distributed_transaction_id +--------------------------------------------------------------------- + +(1 row) + + SET application_name to 'citus'; + \set VERBOSITY terse + WITH shard_data(shardid) + AS (VALUES (1420000)) + SELECT citus_internal_delete_shard_metadata(shardid) FROM shard_data; + citus_internal_delete_shard_metadata +--------------------------------------------------------------------- + +(1 row) + + SELECT count(*) FROM pg_dist_shard WHERE shardid = 1420000; + count +--------------------------------------------------------------------- + 0 +(1 row) + + SELECT count(*) FROM pg_dist_placement WHERE shardid = 1420000; + count +--------------------------------------------------------------------- + 0 +(1 row) + +ROLLBACK; -- we don't need the table/schema anymore -- connect back as super user to drop everything \c - postgres - :worker_1_port diff --git a/src/test/regress/expected/multi_extension.out b/src/test/regress/expected/multi_extension.out index 88fa2c6a7..1486401ec 100644 --- a/src/test/regress/expected/multi_extension.out +++ b/src/test/regress/expected/multi_extension.out @@ -649,9 +649,10 @@ SELECT * FROM multi_extension.print_extension_changes(); | function citus_internal_add_partition_metadata(regclass,"char",text,integer,"char") void | function citus_internal_add_placement_metadata(bigint,integer,bigint,integer,bigint) void | function citus_internal_add_shard_metadata(regclass,bigint,"char",text,text) void + | function citus_internal_delete_shard_metadata(bigint) void | function citus_internal_update_placement_metadata(bigint,integer,integer) void | function stop_metadata_sync_to_node(text,integer,boolean) void -(8 rows) +(9 rows) DROP TABLE multi_extension.prev_objects, multi_extension.extension_diff; -- show running version diff --git a/src/test/regress/expected/upgrade_list_citus_objects.out b/src/test/regress/expected/upgrade_list_citus_objects.out index 8979f3a4a..f040a0a4a 100644 --- a/src/test/regress/expected/upgrade_list_citus_objects.out +++ b/src/test/regress/expected/upgrade_list_citus_objects.out @@ -69,6 +69,7 @@ ORDER BY 1; function citus_internal_add_partition_metadata(regclass,"char",text,integer,"char") function citus_internal_add_placement_metadata(bigint,integer,bigint,integer,bigint) function citus_internal_add_shard_metadata(regclass,bigint,"char",text,text) + function citus_internal_delete_shard_metadata(bigint) function citus_internal_update_placement_metadata(bigint,integer,integer) function citus_isolation_test_session_is_blocked(integer,integer[]) function citus_json_concatenate(json,json) @@ -252,5 +253,5 @@ ORDER BY 1; view citus_worker_stat_activity view pg_dist_shard_placement view time_partitions -(236 rows) +(237 rows) diff --git a/src/test/regress/sql/metadata_sync_helpers.sql b/src/test/regress/sql/metadata_sync_helpers.sql index bcadfdeaf..58339612f 100644 --- a/src/test/regress/sql/metadata_sync_helpers.sql +++ b/src/test/regress/sql/metadata_sync_helpers.sql @@ -572,6 +572,53 @@ BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED; SELECT citus_internal_update_placement_metadata(1420007, get_node_id(), get_node_id()+1); COMMIT; +-- the user only allowed to delete their own shards +BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED; + SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02'); + SET application_name to 'citus'; + \set VERBOSITY terse + WITH shard_data(shardid) + AS (VALUES (1420007)) + SELECT citus_internal_delete_shard_metadata(shardid) FROM shard_data; +ROLLBACK; + +-- the user only allowed to delete shards in a distributed transaction +BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED; + SET application_name to 'citus'; + \set VERBOSITY terse + WITH shard_data(shardid) + AS (VALUES (1420007)) + SELECT citus_internal_delete_shard_metadata(shardid) FROM shard_data; +ROLLBACK; + +-- the user cannot delete non-existing shards +BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED; + SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02'); + SET application_name to 'citus'; + \set VERBOSITY terse + WITH shard_data(shardid) + AS (VALUES (1420100)) + SELECT citus_internal_delete_shard_metadata(shardid) FROM shard_data; +ROLLBACK; + + +-- sucessfully delete shards +BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED; + + SELECT count(*) FROM pg_dist_shard WHERE shardid = 1420000; + SELECT count(*) FROM pg_dist_placement WHERE shardid = 1420000; + + SELECT assign_distributed_transaction_id(0, 8, '2021-07-09 15:41:55.542377+02'); + SET application_name to 'citus'; + \set VERBOSITY terse + WITH shard_data(shardid) + AS (VALUES (1420000)) + SELECT citus_internal_delete_shard_metadata(shardid) FROM shard_data; + + SELECT count(*) FROM pg_dist_shard WHERE shardid = 1420000; + SELECT count(*) FROM pg_dist_placement WHERE shardid = 1420000; +ROLLBACK; + -- we don't need the table/schema anymore -- connect back as super user to drop everything \c - postgres - :worker_1_port