From 6852f8a951916ddbcbff3cd2567b06c9a41c66db Mon Sep 17 00:00:00 2001 From: Marco Slot Date: Wed, 21 Dec 2016 15:07:42 +0100 Subject: [PATCH] Add shard locking UDFs --- src/backend/distributed/Makefile | 4 +- .../distributed/citus--6.1-7--6.1-8.sql | 19 +++ src/backend/distributed/citus.control | 2 +- src/backend/distributed/utils/resource_lock.c | 112 ++++++++++++++++++ src/test/regress/expected/multi_extension.out | 1 + src/test/regress/expected/multi_utilities.out | 82 +++++++++++++ src/test/regress/sql/multi_extension.sql | 1 + src/test/regress/sql/multi_utilities.sql | 40 +++++++ 8 files changed, 259 insertions(+), 2 deletions(-) create mode 100644 src/backend/distributed/citus--6.1-7--6.1-8.sql diff --git a/src/backend/distributed/Makefile b/src/backend/distributed/Makefile index 7b4746bb0..60c50d3e8 100644 --- a/src/backend/distributed/Makefile +++ b/src/backend/distributed/Makefile @@ -9,7 +9,7 @@ EXTVERSIONS = 5.0 5.0-1 5.0-2 \ 5.1-1 5.1-2 5.1-3 5.1-4 5.1-5 5.1-6 5.1-7 5.1-8 \ 5.2-1 5.2-2 5.2-3 5.2-4 \ 6.0-1 6.0-2 6.0-3 6.0-4 6.0-5 6.0-6 6.0-7 6.0-8 6.0-9 6.0-10 6.0-11 6.0-12 6.0-13 6.0-14 6.0-15 6.0-16 6.0-17 6.0-18 \ - 6.1-1 6.1-2 6.1-3 6.1-4 6.1-5 6.1-6 6.1-7 + 6.1-1 6.1-2 6.1-3 6.1-4 6.1-5 6.1-6 6.1-7 6.1-8 # All citus--*.sql files in the source directory DATA = $(patsubst $(citus_abs_srcdir)/%.sql,%.sql,$(wildcard $(citus_abs_srcdir)/$(EXTENSION)--*--*.sql)) @@ -109,6 +109,8 @@ $(EXTENSION)--6.1-6.sql: $(EXTENSION)--6.1-5.sql $(EXTENSION)--6.1-5--6.1-6.sql cat $^ > $@ $(EXTENSION)--6.1-7.sql: $(EXTENSION)--6.1-6.sql $(EXTENSION)--6.1-6--6.1-7.sql cat $^ > $@ +$(EXTENSION)--6.1-8.sql: $(EXTENSION)--6.1-7.sql $(EXTENSION)--6.1-7--6.1-8.sql + cat $^ > $@ NO_PGXS = 1 diff --git a/src/backend/distributed/citus--6.1-7--6.1-8.sql b/src/backend/distributed/citus--6.1-7--6.1-8.sql new file mode 100644 index 000000000..8fff169bc --- /dev/null +++ b/src/backend/distributed/citus--6.1-7--6.1-8.sql @@ -0,0 +1,19 @@ +/* citus--6.1-4--6.1-5.sql */ + +SET search_path = 'pg_catalog'; + +CREATE FUNCTION lock_shard_resources(lock_mode int, shard_id bigint[]) + RETURNS VOID + LANGUAGE C STRICT + AS 'MODULE_PATHNAME', $$lock_shard_resources$$; +COMMENT ON FUNCTION lock_shard_resources(lock_mode int, shard_id bigint[]) + IS 'lock shard resource to serialise non-commutative writes'; + +CREATE FUNCTION lock_shard_metadata(lock_mode int, shard_id bigint[]) + RETURNS VOID + LANGUAGE C STRICT + AS 'MODULE_PATHNAME', $$lock_shard_metadata$$; +COMMENT ON FUNCTION lock_shard_metadata(lock_mode int, shard_id bigint[]) + IS 'lock shard metadata to prevent writes during metadata changes'; + +RESET search_path; diff --git a/src/backend/distributed/citus.control b/src/backend/distributed/citus.control index ca50cbb41..1b343eb0a 100644 --- a/src/backend/distributed/citus.control +++ b/src/backend/distributed/citus.control @@ -1,6 +1,6 @@ # Citus extension comment = 'Citus distributed database' -default_version = '6.1-7' +default_version = '6.1-8' module_pathname = '$libdir/citus' relocatable = false schema = pg_catalog diff --git a/src/backend/distributed/utils/resource_lock.c b/src/backend/distributed/utils/resource_lock.c index ce3a6ad35..0f5b75202 100644 --- a/src/backend/distributed/utils/resource_lock.c +++ b/src/backend/distributed/utils/resource_lock.c @@ -24,9 +24,121 @@ #include "distributed/relay_utility.h" #include "distributed/resource_lock.h" #include "distributed/shardinterval_utils.h" +#include "distributed/worker_protocol.h" #include "storage/lmgr.h" +/* local function forward declarations */ +static LOCKMODE IntToLockMode(int mode); + + +/* exports for SQL callable functions */ +PG_FUNCTION_INFO_V1(lock_shard_metadata); +PG_FUNCTION_INFO_V1(lock_shard_resources); + + +/* + * lock_shard_metadata allows the shard distribution metadata to be locked + * remotely to block concurrent writes from workers in MX tables. + * + * This function does not sort the array to avoid deadlock, callers + * must ensure a consistent order. + */ +Datum +lock_shard_metadata(PG_FUNCTION_ARGS) +{ + LOCKMODE lockMode = IntToLockMode(PG_GETARG_INT32(0)); + ArrayType *shardIdArrayObject = PG_GETARG_ARRAYTYPE_P(1); + Datum *shardIdArrayDatum = NULL; + int shardIdCount = 0; + int shardIdIndex = 0; + + if (ARR_NDIM(shardIdArrayObject) == 0) + { + ereport(ERROR, (errmsg("no locks specified"))); + } + + /* we don't want random users to block writes */ + EnsureSuperUser(); + + shardIdCount = ArrayObjectCount(shardIdArrayObject); + shardIdArrayDatum = DeconstructArrayObject(shardIdArrayObject); + + for (shardIdIndex = 0; shardIdIndex < shardIdCount; shardIdIndex++) + { + int64 shardId = DatumGetInt64(shardIdArrayDatum[shardIdIndex]); + + LockShardDistributionMetadata(shardId, lockMode); + } + + PG_RETURN_VOID(); +} + + +/* + * lock_shard_resources allows shard resources to be locked + * remotely to serialise non-commutative writes on shards. + * + * This function does not sort the array to avoid deadlock, callers + * must ensure a consistent order. + */ +Datum +lock_shard_resources(PG_FUNCTION_ARGS) +{ + LOCKMODE lockMode = IntToLockMode(PG_GETARG_INT32(0)); + ArrayType *shardIdArrayObject = PG_GETARG_ARRAYTYPE_P(1); + Datum *shardIdArrayDatum = NULL; + int shardIdCount = 0; + int shardIdIndex = 0; + + if (ARR_NDIM(shardIdArrayObject) == 0) + { + ereport(ERROR, (errmsg("no locks specified"))); + } + + /* we don't want random users to block writes */ + EnsureSuperUser(); + + shardIdCount = ArrayObjectCount(shardIdArrayObject); + shardIdArrayDatum = DeconstructArrayObject(shardIdArrayObject); + + for (shardIdIndex = 0; shardIdIndex < shardIdCount; shardIdIndex++) + { + int64 shardId = DatumGetInt64(shardIdArrayDatum[shardIdIndex]); + + LockShardResource(shardId, lockMode); + } + + PG_RETURN_VOID(); +} + + +/* + * IntToLockMode verifies whether the specified integer is an accepted lock mode + * and returns it as a LOCKMODE enum. + */ +static LOCKMODE +IntToLockMode(int mode) +{ + if (mode == ExclusiveLock) + { + return ExclusiveLock; + } + else if (mode == ShareLock) + { + return ShareLock; + } + else if (mode == AccessShareLock) + { + return AccessShareLock; + } + else + { + elog(ERROR, "unsupported lockmode %d", mode); + } +} + + /* * LockShardDistributionMetadata returns after grabbing a lock for distribution * metadata related to the specified shard, blocking if required. Any locks diff --git a/src/test/regress/expected/multi_extension.out b/src/test/regress/expected/multi_extension.out index 54a8777ce..a8f167647 100644 --- a/src/test/regress/expected/multi_extension.out +++ b/src/test/regress/expected/multi_extension.out @@ -65,6 +65,7 @@ ALTER EXTENSION citus UPDATE TO '6.1-4'; ALTER EXTENSION citus UPDATE TO '6.1-5'; ALTER EXTENSION citus UPDATE TO '6.1-6'; ALTER EXTENSION citus UPDATE TO '6.1-7'; +ALTER EXTENSION citus UPDATE TO '6.1-8'; -- ensure no objects were created outside pg_catalog SELECT COUNT(*) FROM pg_depend AS pgd, diff --git a/src/test/regress/expected/multi_utilities.out b/src/test/regress/expected/multi_utilities.out index 5aed478a3..09e1e5491 100644 --- a/src/test/regress/expected/multi_utilities.out +++ b/src/test/regress/expected/multi_utilities.out @@ -64,6 +64,88 @@ SELECT master_apply_delete_command('DELETE FROM sharded_table'); 2 (1 row) +-- lock shard metadata: take some share locks and exclusive locks +BEGIN; +SELECT lock_shard_metadata(5, ARRAY[999001, 999002, 999002]); + lock_shard_metadata +--------------------- + +(1 row) + +SELECT lock_shard_metadata(7, ARRAY[999001, 999003, 999004]); + lock_shard_metadata +--------------------- + +(1 row) + +SELECT locktype, objid, mode, granted +FROM pg_locks +WHERE objid IN (999001, 999002, 999003, 999004) +ORDER BY objid, mode; + locktype | objid | mode | granted +----------+--------+---------------+--------- + advisory | 999001 | ExclusiveLock | t + advisory | 999001 | ShareLock | t + advisory | 999002 | ShareLock | t + advisory | 999003 | ExclusiveLock | t + advisory | 999004 | ExclusiveLock | t +(5 rows) + +END; +-- lock shard metadata: unsupported lock type +SELECT lock_shard_metadata(0, ARRAY[990001, 999002]); +ERROR: unsupported lockmode 0 +-- lock shard metadata: invalid shard ID +SELECT lock_shard_metadata(5, ARRAY[0]); + lock_shard_metadata +--------------------- + +(1 row) + +-- lock shard metadata: lock nothing +SELECT lock_shard_metadata(5, ARRAY[]::bigint[]); +ERROR: no locks specified +-- lock shard resources: take some share locks and exclusive locks +BEGIN; +SELECT lock_shard_resources(5, ARRAY[999001, 999002, 999002]); + lock_shard_resources +---------------------- + +(1 row) + +SELECT lock_shard_resources(7, ARRAY[999001, 999003, 999004]); + lock_shard_resources +---------------------- + +(1 row) + +SELECT locktype, objid, mode, granted +FROM pg_locks +WHERE objid IN (999001, 999002, 999003, 999004) +ORDER BY objid, mode; + locktype | objid | mode | granted +----------+--------+---------------+--------- + advisory | 999001 | ExclusiveLock | t + advisory | 999001 | ShareLock | t + advisory | 999002 | ShareLock | t + advisory | 999003 | ExclusiveLock | t + advisory | 999004 | ExclusiveLock | t +(5 rows) + +END; +-- lock shard metadata: unsupported lock type +SELECT lock_shard_resources(0, ARRAY[990001, 999002]); +ERROR: unsupported lockmode 0 +-- lock shard metadata: invalid shard ID +SELECT lock_shard_resources(5, ARRAY[-1]); + lock_shard_resources +---------------------- + +(1 row) + +-- lock shard metadata: lock nothing +SELECT lock_shard_resources(5, ARRAY[]::bigint[]); +ERROR: no locks specified -- drop table DROP TABLE sharded_table; -- VACUUM tests diff --git a/src/test/regress/sql/multi_extension.sql b/src/test/regress/sql/multi_extension.sql index 8dc7a8717..89018fe21 100644 --- a/src/test/regress/sql/multi_extension.sql +++ b/src/test/regress/sql/multi_extension.sql @@ -65,6 +65,7 @@ ALTER EXTENSION citus UPDATE TO '6.1-4'; ALTER EXTENSION citus UPDATE TO '6.1-5'; ALTER EXTENSION citus UPDATE TO '6.1-6'; ALTER EXTENSION citus UPDATE TO '6.1-7'; +ALTER EXTENSION citus UPDATE TO '6.1-8'; -- ensure no objects were created outside pg_catalog SELECT COUNT(*) diff --git a/src/test/regress/sql/multi_utilities.sql b/src/test/regress/sql/multi_utilities.sql index 13581353e..e0a078cc9 100644 --- a/src/test/regress/sql/multi_utilities.sql +++ b/src/test/regress/sql/multi_utilities.sql @@ -38,6 +38,46 @@ SELECT master_apply_delete_command('DELETE FROM sharded_table WHERE id > 0'); -- drop all shards SELECT master_apply_delete_command('DELETE FROM sharded_table'); +-- lock shard metadata: take some share locks and exclusive locks +BEGIN; +SELECT lock_shard_metadata(5, ARRAY[999001, 999002, 999002]); +SELECT lock_shard_metadata(7, ARRAY[999001, 999003, 999004]); + +SELECT locktype, objid, mode, granted +FROM pg_locks +WHERE objid IN (999001, 999002, 999003, 999004) +ORDER BY objid, mode; +END; + +-- lock shard metadata: unsupported lock type +SELECT lock_shard_metadata(0, ARRAY[990001, 999002]); + +-- lock shard metadata: invalid shard ID +SELECT lock_shard_metadata(5, ARRAY[0]); + +-- lock shard metadata: lock nothing +SELECT lock_shard_metadata(5, ARRAY[]::bigint[]); + +-- lock shard resources: take some share locks and exclusive locks +BEGIN; +SELECT lock_shard_resources(5, ARRAY[999001, 999002, 999002]); +SELECT lock_shard_resources(7, ARRAY[999001, 999003, 999004]); + +SELECT locktype, objid, mode, granted +FROM pg_locks +WHERE objid IN (999001, 999002, 999003, 999004) +ORDER BY objid, mode; +END; + +-- lock shard metadata: unsupported lock type +SELECT lock_shard_resources(0, ARRAY[990001, 999002]); + +-- lock shard metadata: invalid shard ID +SELECT lock_shard_resources(5, ARRAY[-1]); + +-- lock shard metadata: lock nothing +SELECT lock_shard_resources(5, ARRAY[]::bigint[]); + -- drop table DROP TABLE sharded_table;