Merge pull request #1056 from citusdata/feature/mx_locks

Add shard locking UDFs
pull/1010/head
Marco Slot 2016-12-22 11:20:43 +01:00 committed by GitHub
commit b7d0a3237b
8 changed files with 259 additions and 2 deletions

View File

@ -9,7 +9,7 @@ EXTVERSIONS = 5.0 5.0-1 5.0-2 \
5.1-1 5.1-2 5.1-3 5.1-4 5.1-5 5.1-6 5.1-7 5.1-8 \
5.2-1 5.2-2 5.2-3 5.2-4 \
6.0-1 6.0-2 6.0-3 6.0-4 6.0-5 6.0-6 6.0-7 6.0-8 6.0-9 6.0-10 6.0-11 6.0-12 6.0-13 6.0-14 6.0-15 6.0-16 6.0-17 6.0-18 \
6.1-1 6.1-2 6.1-3 6.1-4 6.1-5 6.1-6 6.1-7
6.1-1 6.1-2 6.1-3 6.1-4 6.1-5 6.1-6 6.1-7 6.1-8
# All citus--*.sql files in the source directory
DATA = $(patsubst $(citus_abs_srcdir)/%.sql,%.sql,$(wildcard $(citus_abs_srcdir)/$(EXTENSION)--*--*.sql))
@ -109,6 +109,8 @@ $(EXTENSION)--6.1-6.sql: $(EXTENSION)--6.1-5.sql $(EXTENSION)--6.1-5--6.1-6.sql
cat $^ > $@
$(EXTENSION)--6.1-7.sql: $(EXTENSION)--6.1-6.sql $(EXTENSION)--6.1-6--6.1-7.sql
cat $^ > $@
$(EXTENSION)--6.1-8.sql: $(EXTENSION)--6.1-7.sql $(EXTENSION)--6.1-7--6.1-8.sql
cat $^ > $@
NO_PGXS = 1

View File

@ -0,0 +1,19 @@
/* citus--6.1-4--6.1-5.sql */
SET search_path = 'pg_catalog';
CREATE FUNCTION lock_shard_resources(lock_mode int, shard_id bigint[])
RETURNS VOID
LANGUAGE C STRICT
AS 'MODULE_PATHNAME', $$lock_shard_resources$$;
COMMENT ON FUNCTION lock_shard_resources(lock_mode int, shard_id bigint[])
IS 'lock shard resource to serialise non-commutative writes';
CREATE FUNCTION lock_shard_metadata(lock_mode int, shard_id bigint[])
RETURNS VOID
LANGUAGE C STRICT
AS 'MODULE_PATHNAME', $$lock_shard_metadata$$;
COMMENT ON FUNCTION lock_shard_metadata(lock_mode int, shard_id bigint[])
IS 'lock shard metadata to prevent writes during metadata changes';
RESET search_path;

View File

@ -1,6 +1,6 @@
# Citus extension
comment = 'Citus distributed database'
default_version = '6.1-7'
default_version = '6.1-8'
module_pathname = '$libdir/citus'
relocatable = false
schema = pg_catalog

View File

@ -24,9 +24,121 @@
#include "distributed/relay_utility.h"
#include "distributed/resource_lock.h"
#include "distributed/shardinterval_utils.h"
#include "distributed/worker_protocol.h"
#include "storage/lmgr.h"
/* local function forward declarations */
static LOCKMODE IntToLockMode(int mode);
/* exports for SQL callable functions */
PG_FUNCTION_INFO_V1(lock_shard_metadata);
PG_FUNCTION_INFO_V1(lock_shard_resources);
/*
* lock_shard_metadata allows the shard distribution metadata to be locked
* remotely to block concurrent writes from workers in MX tables.
*
* This function does not sort the array to avoid deadlock, callers
* must ensure a consistent order.
*/
Datum
lock_shard_metadata(PG_FUNCTION_ARGS)
{
LOCKMODE lockMode = IntToLockMode(PG_GETARG_INT32(0));
ArrayType *shardIdArrayObject = PG_GETARG_ARRAYTYPE_P(1);
Datum *shardIdArrayDatum = NULL;
int shardIdCount = 0;
int shardIdIndex = 0;
if (ARR_NDIM(shardIdArrayObject) == 0)
{
ereport(ERROR, (errmsg("no locks specified")));
}
/* we don't want random users to block writes */
EnsureSuperUser();
shardIdCount = ArrayObjectCount(shardIdArrayObject);
shardIdArrayDatum = DeconstructArrayObject(shardIdArrayObject);
for (shardIdIndex = 0; shardIdIndex < shardIdCount; shardIdIndex++)
{
int64 shardId = DatumGetInt64(shardIdArrayDatum[shardIdIndex]);
LockShardDistributionMetadata(shardId, lockMode);
}
PG_RETURN_VOID();
}
/*
* lock_shard_resources allows shard resources to be locked
* remotely to serialise non-commutative writes on shards.
*
* This function does not sort the array to avoid deadlock, callers
* must ensure a consistent order.
*/
Datum
lock_shard_resources(PG_FUNCTION_ARGS)
{
LOCKMODE lockMode = IntToLockMode(PG_GETARG_INT32(0));
ArrayType *shardIdArrayObject = PG_GETARG_ARRAYTYPE_P(1);
Datum *shardIdArrayDatum = NULL;
int shardIdCount = 0;
int shardIdIndex = 0;
if (ARR_NDIM(shardIdArrayObject) == 0)
{
ereport(ERROR, (errmsg("no locks specified")));
}
/* we don't want random users to block writes */
EnsureSuperUser();
shardIdCount = ArrayObjectCount(shardIdArrayObject);
shardIdArrayDatum = DeconstructArrayObject(shardIdArrayObject);
for (shardIdIndex = 0; shardIdIndex < shardIdCount; shardIdIndex++)
{
int64 shardId = DatumGetInt64(shardIdArrayDatum[shardIdIndex]);
LockShardResource(shardId, lockMode);
}
PG_RETURN_VOID();
}
/*
* IntToLockMode verifies whether the specified integer is an accepted lock mode
* and returns it as a LOCKMODE enum.
*/
static LOCKMODE
IntToLockMode(int mode)
{
if (mode == ExclusiveLock)
{
return ExclusiveLock;
}
else if (mode == ShareLock)
{
return ShareLock;
}
else if (mode == AccessShareLock)
{
return AccessShareLock;
}
else
{
elog(ERROR, "unsupported lockmode %d", mode);
}
}
/*
* LockShardDistributionMetadata returns after grabbing a lock for distribution
* metadata related to the specified shard, blocking if required. Any locks

View File

@ -65,6 +65,7 @@ ALTER EXTENSION citus UPDATE TO '6.1-4';
ALTER EXTENSION citus UPDATE TO '6.1-5';
ALTER EXTENSION citus UPDATE TO '6.1-6';
ALTER EXTENSION citus UPDATE TO '6.1-7';
ALTER EXTENSION citus UPDATE TO '6.1-8';
-- ensure no objects were created outside pg_catalog
SELECT COUNT(*)
FROM pg_depend AS pgd,

View File

@ -64,6 +64,88 @@ SELECT master_apply_delete_command('DELETE FROM sharded_table');
2
(1 row)
-- lock shard metadata: take some share locks and exclusive locks
BEGIN;
SELECT lock_shard_metadata(5, ARRAY[999001, 999002, 999002]);
lock_shard_metadata
---------------------
(1 row)
SELECT lock_shard_metadata(7, ARRAY[999001, 999003, 999004]);
lock_shard_metadata
---------------------
(1 row)
SELECT locktype, objid, mode, granted
FROM pg_locks
WHERE objid IN (999001, 999002, 999003, 999004)
ORDER BY objid, mode;
locktype | objid | mode | granted
----------+--------+---------------+---------
advisory | 999001 | ExclusiveLock | t
advisory | 999001 | ShareLock | t
advisory | 999002 | ShareLock | t
advisory | 999003 | ExclusiveLock | t
advisory | 999004 | ExclusiveLock | t
(5 rows)
END;
-- lock shard metadata: unsupported lock type
SELECT lock_shard_metadata(0, ARRAY[990001, 999002]);
ERROR: unsupported lockmode 0
-- lock shard metadata: invalid shard ID
SELECT lock_shard_metadata(5, ARRAY[0]);
lock_shard_metadata
---------------------
(1 row)
-- lock shard metadata: lock nothing
SELECT lock_shard_metadata(5, ARRAY[]::bigint[]);
ERROR: no locks specified
-- lock shard resources: take some share locks and exclusive locks
BEGIN;
SELECT lock_shard_resources(5, ARRAY[999001, 999002, 999002]);
lock_shard_resources
----------------------
(1 row)
SELECT lock_shard_resources(7, ARRAY[999001, 999003, 999004]);
lock_shard_resources
----------------------
(1 row)
SELECT locktype, objid, mode, granted
FROM pg_locks
WHERE objid IN (999001, 999002, 999003, 999004)
ORDER BY objid, mode;
locktype | objid | mode | granted
----------+--------+---------------+---------
advisory | 999001 | ExclusiveLock | t
advisory | 999001 | ShareLock | t
advisory | 999002 | ShareLock | t
advisory | 999003 | ExclusiveLock | t
advisory | 999004 | ExclusiveLock | t
(5 rows)
END;
-- lock shard metadata: unsupported lock type
SELECT lock_shard_resources(0, ARRAY[990001, 999002]);
ERROR: unsupported lockmode 0
-- lock shard metadata: invalid shard ID
SELECT lock_shard_resources(5, ARRAY[-1]);
lock_shard_resources
----------------------
(1 row)
-- lock shard metadata: lock nothing
SELECT lock_shard_resources(5, ARRAY[]::bigint[]);
ERROR: no locks specified
-- drop table
DROP TABLE sharded_table;
-- VACUUM tests

View File

@ -65,6 +65,7 @@ ALTER EXTENSION citus UPDATE TO '6.1-4';
ALTER EXTENSION citus UPDATE TO '6.1-5';
ALTER EXTENSION citus UPDATE TO '6.1-6';
ALTER EXTENSION citus UPDATE TO '6.1-7';
ALTER EXTENSION citus UPDATE TO '6.1-8';
-- ensure no objects were created outside pg_catalog
SELECT COUNT(*)

View File

@ -38,6 +38,46 @@ SELECT master_apply_delete_command('DELETE FROM sharded_table WHERE id > 0');
-- drop all shards
SELECT master_apply_delete_command('DELETE FROM sharded_table');
-- lock shard metadata: take some share locks and exclusive locks
BEGIN;
SELECT lock_shard_metadata(5, ARRAY[999001, 999002, 999002]);
SELECT lock_shard_metadata(7, ARRAY[999001, 999003, 999004]);
SELECT locktype, objid, mode, granted
FROM pg_locks
WHERE objid IN (999001, 999002, 999003, 999004)
ORDER BY objid, mode;
END;
-- lock shard metadata: unsupported lock type
SELECT lock_shard_metadata(0, ARRAY[990001, 999002]);
-- lock shard metadata: invalid shard ID
SELECT lock_shard_metadata(5, ARRAY[0]);
-- lock shard metadata: lock nothing
SELECT lock_shard_metadata(5, ARRAY[]::bigint[]);
-- lock shard resources: take some share locks and exclusive locks
BEGIN;
SELECT lock_shard_resources(5, ARRAY[999001, 999002, 999002]);
SELECT lock_shard_resources(7, ARRAY[999001, 999003, 999004]);
SELECT locktype, objid, mode, granted
FROM pg_locks
WHERE objid IN (999001, 999002, 999003, 999004)
ORDER BY objid, mode;
END;
-- lock shard metadata: unsupported lock type
SELECT lock_shard_resources(0, ARRAY[990001, 999002]);
-- lock shard metadata: invalid shard ID
SELECT lock_shard_resources(5, ARRAY[-1]);
-- lock shard metadata: lock nothing
SELECT lock_shard_resources(5, ARRAY[]::bigint[]);
-- drop table
DROP TABLE sharded_table;