Replace shardId lock with lock on colocation+shardIntervalIndex (#3374)

This new locking pattern makes sure that some deadlocks that could
happend during rebalancing cannot occur anymore.
pull/3373/head
Jelte Fennema 2020-01-16 13:14:01 +01:00 committed by GitHub
parent 86876c0473
commit e76281500c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 193 additions and 149 deletions

View File

@ -212,14 +212,6 @@ CreateShardsWithRoundRobinPolicy(Oid distributedTableId, int32 shardCount,
text *minHashTokenText = IntegerToText(shardMinHashToken);
text *maxHashTokenText = IntegerToText(shardMaxHashToken);
/*
* Grabbing the shard metadata lock isn't technically necessary since
* we already hold an exclusive lock on the partition table, but we'll
* acquire it for the sake of completeness. As we're adding new active
* placements, the mode must be exclusive.
*/
LockShardDistributionMetadata(shardId, ExclusiveLock);
InsertShardRow(distributedTableId, shardId, shardStorageType,
minHashTokenText, maxHashTokenText);
@ -378,14 +370,6 @@ CreateReferenceTableShard(Oid distributedTableId)
/* get the next shard id */
uint64 shardId = GetNextShardId();
/*
* Grabbing the shard metadata lock isn't technically necessary since
* we already hold an exclusive lock on the partition table, but we'll
* acquire it for the sake of completeness. As we're adding new active
* placements, the mode must be exclusive.
*/
LockShardDistributionMetadata(shardId, ExclusiveLock);
InsertShardRow(distributedTableId, shardId, shardStorageType, shardMinValue,
shardMaxValue);

View File

@ -332,7 +332,22 @@ LockShardDistributionMetadata(int64 shardId, LOCKMODE lockMode)
const bool sessionLock = false;
const bool dontWait = false;
SET_LOCKTAG_SHARD_METADATA_RESOURCE(tag, MyDatabaseId, shardId);
ShardInterval *shardInterval = LoadShardInterval(shardId);
Oid distributedTableId = shardInterval->relationId;
DistTableCacheEntry *distributedTable = DistributedTableCacheEntry(
distributedTableId);
uint32 colocationId = distributedTable->colocationId;
if (colocationId == INVALID_COLOCATION_ID ||
distributedTable->partitionMethod != DISTRIBUTE_BY_HASH)
{
SET_LOCKTAG_SHARD_METADATA_RESOURCE(tag, MyDatabaseId, shardId);
}
else
{
SET_LOCKTAG_COLOCATED_SHARDS_METADATA_RESOURCE(tag, MyDatabaseId, colocationId,
shardInterval->shardIndex);
}
(void) LockAcquire(&tag, lockMode, sessionLock, dontWait);
}

View File

@ -36,7 +36,8 @@ typedef enum AdvisoryLocktagClass
ADV_LOCKTAG_CLASS_CITUS_SHARD_METADATA = 4,
ADV_LOCKTAG_CLASS_CITUS_SHARD = 5,
ADV_LOCKTAG_CLASS_CITUS_JOB = 6,
ADV_LOCKTAG_CLASS_CITUS_REBALANCE_COLOCATION = 7
ADV_LOCKTAG_CLASS_CITUS_REBALANCE_COLOCATION = 7,
ADV_LOCKTAG_CLASS_CITUS_COLOCATED_SHARDS_METADATA = 8,
} AdvisoryLocktagClass;
@ -48,6 +49,14 @@ typedef enum AdvisoryLocktagClass
(uint32) (shardid), \
ADV_LOCKTAG_CLASS_CITUS_SHARD_METADATA)
#define SET_LOCKTAG_COLOCATED_SHARDS_METADATA_RESOURCE(tag, db, colocationId, \
shardIntervalIndex) \
SET_LOCKTAG_ADVISORY(tag, \
db, \
(uint32) shardIntervalIndex, \
(uint32) colocationId, \
ADV_LOCKTAG_CLASS_CITUS_COLOCATED_SHARDS_METADATA)
/* reuse advisory lock, but with different, unused field 4 (5)*/
#define SET_LOCKTAG_SHARD_RESOURCE(tag, db, shardid) \
SET_LOCKTAG_ADVISORY(tag, \

View File

@ -1424,98 +1424,109 @@ SELECT relation::regclass, locktype, mode FROM pg_locks WHERE relation::regclass
(8 rows)
COMMIT;
CREATE VIEW lockinfo AS
SELECT
logicalrelid,
CASE
WHEN l.objsubid = 5 THEN 'shard'
WHEN l.objsubid = 4 THEN 'shard_metadata'
ELSE 'colocated_shards_metadata'
END AS locktype,
mode
FROM
pg_locks AS l JOIN (select row_number() over (partition by logicalrelid order by shardminvalue) -1 as shardintervalindex, * from pg_dist_shard) AS s
ON
(l.objsubid IN (4, 5) AND l.objid = s.shardid )
OR (l.objsubid = 8
AND l.objid IN (select colocationid from pg_dist_partition AS p where p.logicalrelid = s.logicalrelid)
AND l.classid = shardintervalindex
)
WHERE
logicalrelid IN ('partitioning_locks', 'partitioning_locks_2009', 'partitioning_locks_2010')
AND pid = pg_backend_pid()
AND l.locktype = 'advisory'
ORDER BY
1, 2, 3;
-- test shard resource locks with multi-shard UPDATE
BEGIN;
UPDATE partitioning_locks_2009 SET time = '2009-03-01';
-- see the locks on parent table
SELECT
logicalrelid,
locktype,
mode
FROM
pg_locks AS l JOIN pg_dist_shard AS s
ON
l.objid = s.shardid
WHERE
logicalrelid IN ('partitioning_locks', 'partitioning_locks_2009', 'partitioning_locks_2010') AND
pid = pg_backend_pid()
ORDER BY
1, 2, 3;
logicalrelid | locktype | mode
SELECT * FROM lockinfo;
logicalrelid | locktype | mode
---------------------------------------------------------------------
partitioning_locks | advisory | ShareUpdateExclusiveLock
partitioning_locks | advisory | ShareUpdateExclusiveLock
partitioning_locks | advisory | ShareUpdateExclusiveLock
partitioning_locks | advisory | ShareUpdateExclusiveLock
partitioning_locks_2009 | advisory | ShareLock
partitioning_locks_2009 | advisory | ShareLock
partitioning_locks_2009 | advisory | ShareLock
partitioning_locks_2009 | advisory | ShareLock
partitioning_locks_2009 | advisory | ShareUpdateExclusiveLock
partitioning_locks_2009 | advisory | ShareUpdateExclusiveLock
partitioning_locks_2009 | advisory | ShareUpdateExclusiveLock
partitioning_locks_2009 | advisory | ShareUpdateExclusiveLock
(12 rows)
partitioning_locks | colocated_shards_metadata | ShareLock
partitioning_locks | colocated_shards_metadata | ShareLock
partitioning_locks | colocated_shards_metadata | ShareLock
partitioning_locks | colocated_shards_metadata | ShareLock
partitioning_locks | shard | ShareUpdateExclusiveLock
partitioning_locks | shard | ShareUpdateExclusiveLock
partitioning_locks | shard | ShareUpdateExclusiveLock
partitioning_locks | shard | ShareUpdateExclusiveLock
partitioning_locks_2009 | colocated_shards_metadata | ShareLock
partitioning_locks_2009 | colocated_shards_metadata | ShareLock
partitioning_locks_2009 | colocated_shards_metadata | ShareLock
partitioning_locks_2009 | colocated_shards_metadata | ShareLock
partitioning_locks_2009 | shard | ShareUpdateExclusiveLock
partitioning_locks_2009 | shard | ShareUpdateExclusiveLock
partitioning_locks_2009 | shard | ShareUpdateExclusiveLock
partitioning_locks_2009 | shard | ShareUpdateExclusiveLock
partitioning_locks_2010 | colocated_shards_metadata | ShareLock
partitioning_locks_2010 | colocated_shards_metadata | ShareLock
partitioning_locks_2010 | colocated_shards_metadata | ShareLock
partitioning_locks_2010 | colocated_shards_metadata | ShareLock
(20 rows)
COMMIT;
-- test shard resource locks with TRUNCATE
BEGIN;
TRUNCATE partitioning_locks_2009;
-- see the locks on parent table
SELECT
logicalrelid,
locktype,
mode
FROM
pg_locks AS l JOIN pg_dist_shard AS s
ON
l.objid = s.shardid
WHERE
logicalrelid IN ('partitioning_locks', 'partitioning_locks_2009', 'partitioning_locks_2010') AND
pid = pg_backend_pid()
ORDER BY
1, 2, 3;
logicalrelid | locktype | mode
SELECT * FROM lockinfo;
logicalrelid | locktype | mode
---------------------------------------------------------------------
partitioning_locks_2009 | advisory | ShareLock
partitioning_locks_2009 | advisory | ShareLock
partitioning_locks_2009 | advisory | ShareLock
partitioning_locks_2009 | advisory | ShareLock
(4 rows)
partitioning_locks | colocated_shards_metadata | ShareLock
partitioning_locks | colocated_shards_metadata | ShareLock
partitioning_locks | colocated_shards_metadata | ShareLock
partitioning_locks | colocated_shards_metadata | ShareLock
partitioning_locks_2009 | colocated_shards_metadata | ShareLock
partitioning_locks_2009 | colocated_shards_metadata | ShareLock
partitioning_locks_2009 | colocated_shards_metadata | ShareLock
partitioning_locks_2009 | colocated_shards_metadata | ShareLock
partitioning_locks_2010 | colocated_shards_metadata | ShareLock
partitioning_locks_2010 | colocated_shards_metadata | ShareLock
partitioning_locks_2010 | colocated_shards_metadata | ShareLock
partitioning_locks_2010 | colocated_shards_metadata | ShareLock
(12 rows)
COMMIT;
-- test shard resource locks with INSERT/SELECT
BEGIN;
INSERT INTO partitioning_locks_2009 SELECT * FROM partitioning_locks WHERE time >= '2009-01-01' AND time < '2010-01-01';
-- see the locks on parent table
SELECT
logicalrelid,
locktype,
mode
FROM
pg_locks AS l JOIN pg_dist_shard AS s
ON
l.objid = s.shardid
WHERE
logicalrelid IN ('partitioning_locks', 'partitioning_locks_2009', 'partitioning_locks_2010') AND
pid = pg_backend_pid()
ORDER BY
1, 2, 3;
logicalrelid | locktype | mode
SELECT * FROM lockinfo;
logicalrelid | locktype | mode
---------------------------------------------------------------------
partitioning_locks | advisory | ShareUpdateExclusiveLock
partitioning_locks | advisory | ShareUpdateExclusiveLock
partitioning_locks | advisory | ShareUpdateExclusiveLock
partitioning_locks | advisory | ShareUpdateExclusiveLock
partitioning_locks_2009 | advisory | ShareLock
partitioning_locks_2009 | advisory | ShareLock
partitioning_locks_2009 | advisory | ShareLock
partitioning_locks_2009 | advisory | ShareLock
partitioning_locks_2009 | advisory | ShareUpdateExclusiveLock
partitioning_locks_2009 | advisory | ShareUpdateExclusiveLock
partitioning_locks_2009 | advisory | ShareUpdateExclusiveLock
partitioning_locks_2009 | advisory | ShareUpdateExclusiveLock
(12 rows)
partitioning_locks | colocated_shards_metadata | ShareLock
partitioning_locks | colocated_shards_metadata | ShareLock
partitioning_locks | colocated_shards_metadata | ShareLock
partitioning_locks | colocated_shards_metadata | ShareLock
partitioning_locks | shard | ShareUpdateExclusiveLock
partitioning_locks | shard | ShareUpdateExclusiveLock
partitioning_locks | shard | ShareUpdateExclusiveLock
partitioning_locks | shard | ShareUpdateExclusiveLock
partitioning_locks_2009 | colocated_shards_metadata | ShareLock
partitioning_locks_2009 | colocated_shards_metadata | ShareLock
partitioning_locks_2009 | colocated_shards_metadata | ShareLock
partitioning_locks_2009 | colocated_shards_metadata | ShareLock
partitioning_locks_2009 | shard | ShareUpdateExclusiveLock
partitioning_locks_2009 | shard | ShareUpdateExclusiveLock
partitioning_locks_2009 | shard | ShareUpdateExclusiveLock
partitioning_locks_2009 | shard | ShareUpdateExclusiveLock
partitioning_locks_2010 | colocated_shards_metadata | ShareLock
partitioning_locks_2010 | colocated_shards_metadata | ShareLock
partitioning_locks_2010 | colocated_shards_metadata | ShareLock
partitioning_locks_2010 | colocated_shards_metadata | ShareLock
(20 rows)
COMMIT;
-- test partition-wise join
@ -1722,6 +1733,7 @@ SELECT success FROM run_command_on_workers('select pg_reload_conf()');
(2 rows)
RESET enable_partitionwise_join;
DROP VIEW lockinfo;
DROP TABLE
IF EXISTS
partitioning_test_2009,

View File

@ -69,6 +69,18 @@ SELECT master_apply_delete_command('DELETE FROM sharded_table');
ERROR: cannot delete from hash distributed table with this command
DETAIL: Delete statements on hash-partitioned tables are not supported with master_apply_delete_command.
HINT: Use the DELETE command instead.
SET citus.shard_count TO 4;
SET citus.next_shard_id TO 999001;
ALTER SEQUENCE pg_catalog.pg_dist_colocationid_seq RESTART 1400000;
CREATE TABLE lockable_table ( name text, id bigint );
SELECT create_distributed_table('lockable_table', 'id', 'hash', colocate_with := 'none');
create_distributed_table
---------------------------------------------------------------------
(1 row)
SET citus.shard_count TO 2;
SET citus.next_shard_id TO 990002;
-- lock shard metadata: take some share locks and exclusive locks
BEGIN;
SELECT lock_shard_metadata(5, ARRAY[999001, 999002, 999002]);
@ -83,17 +95,26 @@ SELECT lock_shard_metadata(7, ARRAY[999001, 999003, 999004]);
(1 row)
SELECT locktype, objid, mode, granted
FROM pg_locks
WHERE objid IN (999001, 999002, 999003, 999004)
ORDER BY objid, mode;
locktype | objid | mode | granted
SELECT
CASE
WHEN l.objsubid = 5 THEN 'shard'
WHEN l.objsubid = 4 THEN 'shard_metadata'
ELSE 'colocated_shards_metadata'
END AS locktype,
objid,
classid,
mode,
granted
FROM pg_locks l
WHERE l.locktype = 'advisory'
ORDER BY locktype, objid, classid, mode;
locktype | objid | classid | mode | granted
---------------------------------------------------------------------
advisory | 999001 | ExclusiveLock | t
advisory | 999001 | ShareLock | t
advisory | 999002 | ShareLock | t
advisory | 999003 | ExclusiveLock | t
advisory | 999004 | ExclusiveLock | t
colocated_shards_metadata | 1400000 | 0 | ExclusiveLock | t
colocated_shards_metadata | 1400000 | 0 | ShareLock | t
colocated_shards_metadata | 1400000 | 1 | ShareLock | t
colocated_shards_metadata | 1400000 | 2 | ExclusiveLock | t
colocated_shards_metadata | 1400000 | 3 | ExclusiveLock | t
(5 rows)
END;
@ -102,11 +123,7 @@ SELECT lock_shard_metadata(0, ARRAY[990001, 999002]);
ERROR: unsupported lockmode 0
-- lock shard metadata: invalid shard ID
SELECT lock_shard_metadata(5, ARRAY[0]);
lock_shard_metadata
---------------------------------------------------------------------
(1 row)
ERROR: could not find valid entry for shard xxxxx
-- lock shard metadata: lock nothing
SELECT lock_shard_metadata(5, ARRAY[]::bigint[]);
ERROR: no locks specified
@ -153,6 +170,7 @@ SELECT lock_shard_resources(5, ARRAY[]::bigint[]);
ERROR: no locks specified
-- drop table
DROP TABLE sharded_table;
DROP TABLE lockable_table;
-- VACUUM tests
-- create a table with a single shard (for convenience)
SET citus.shard_count TO 1;

View File

@ -920,24 +920,36 @@ TRUNCATE partitioning_locks;
SELECT relation::regclass, locktype, mode FROM pg_locks WHERE relation::regclass::text LIKE 'partitioning_locks%' AND pid = pg_backend_pid() ORDER BY 1, 2, 3;
COMMIT;
CREATE VIEW lockinfo AS
SELECT
logicalrelid,
CASE
WHEN l.objsubid = 5 THEN 'shard'
WHEN l.objsubid = 4 THEN 'shard_metadata'
ELSE 'colocated_shards_metadata'
END AS locktype,
mode
FROM
pg_locks AS l JOIN (select row_number() over (partition by logicalrelid order by shardminvalue) -1 as shardintervalindex, * from pg_dist_shard) AS s
ON
(l.objsubid IN (4, 5) AND l.objid = s.shardid )
OR (l.objsubid = 8
AND l.objid IN (select colocationid from pg_dist_partition AS p where p.logicalrelid = s.logicalrelid)
AND l.classid = shardintervalindex
)
WHERE
logicalrelid IN ('partitioning_locks', 'partitioning_locks_2009', 'partitioning_locks_2010')
AND pid = pg_backend_pid()
AND l.locktype = 'advisory'
ORDER BY
1, 2, 3;
-- test shard resource locks with multi-shard UPDATE
BEGIN;
UPDATE partitioning_locks_2009 SET time = '2009-03-01';
-- see the locks on parent table
SELECT
logicalrelid,
locktype,
mode
FROM
pg_locks AS l JOIN pg_dist_shard AS s
ON
l.objid = s.shardid
WHERE
logicalrelid IN ('partitioning_locks', 'partitioning_locks_2009', 'partitioning_locks_2010') AND
pid = pg_backend_pid()
ORDER BY
1, 2, 3;
SELECT * FROM lockinfo;
COMMIT;
-- test shard resource locks with TRUNCATE
@ -945,19 +957,7 @@ BEGIN;
TRUNCATE partitioning_locks_2009;
-- see the locks on parent table
SELECT
logicalrelid,
locktype,
mode
FROM
pg_locks AS l JOIN pg_dist_shard AS s
ON
l.objid = s.shardid
WHERE
logicalrelid IN ('partitioning_locks', 'partitioning_locks_2009', 'partitioning_locks_2010') AND
pid = pg_backend_pid()
ORDER BY
1, 2, 3;
SELECT * FROM lockinfo;
COMMIT;
-- test shard resource locks with INSERT/SELECT
@ -965,19 +965,7 @@ BEGIN;
INSERT INTO partitioning_locks_2009 SELECT * FROM partitioning_locks WHERE time >= '2009-01-01' AND time < '2010-01-01';
-- see the locks on parent table
SELECT
logicalrelid,
locktype,
mode
FROM
pg_locks AS l JOIN pg_dist_shard AS s
ON
l.objid = s.shardid
WHERE
logicalrelid IN ('partitioning_locks', 'partitioning_locks_2009', 'partitioning_locks_2010') AND
pid = pg_backend_pid()
ORDER BY
1, 2, 3;
SELECT * FROM lockinfo;
COMMIT;
-- test partition-wise join
@ -1026,6 +1014,7 @@ SELECT success FROM run_command_on_workers('select pg_reload_conf()');
RESET enable_partitionwise_join;
DROP VIEW lockinfo;
DROP TABLE
IF EXISTS
partitioning_test_2009,

View File

@ -48,16 +48,32 @@ SELECT master_apply_delete_command('DELETE FROM sharded_table WHERE id > 0');
-- drop all shards
SELECT master_apply_delete_command('DELETE FROM sharded_table');
SET citus.shard_count TO 4;
SET citus.next_shard_id TO 999001;
ALTER SEQUENCE pg_catalog.pg_dist_colocationid_seq RESTART 1400000;
CREATE TABLE lockable_table ( name text, id bigint );
SELECT create_distributed_table('lockable_table', 'id', 'hash', colocate_with := 'none');
SET citus.shard_count TO 2;
SET citus.next_shard_id TO 990002;
-- lock shard metadata: take some share locks and exclusive locks
BEGIN;
SELECT lock_shard_metadata(5, ARRAY[999001, 999002, 999002]);
SELECT lock_shard_metadata(7, ARRAY[999001, 999003, 999004]);
SELECT locktype, objid, mode, granted
FROM pg_locks
WHERE objid IN (999001, 999002, 999003, 999004)
ORDER BY objid, mode;
SELECT
CASE
WHEN l.objsubid = 5 THEN 'shard'
WHEN l.objsubid = 4 THEN 'shard_metadata'
ELSE 'colocated_shards_metadata'
END AS locktype,
objid,
classid,
mode,
granted
FROM pg_locks l
WHERE l.locktype = 'advisory'
ORDER BY locktype, objid, classid, mode;
END;
-- lock shard metadata: unsupported lock type
@ -91,6 +107,7 @@ SELECT lock_shard_resources(5, ARRAY[]::bigint[]);
-- drop table
DROP TABLE sharded_table;
DROP TABLE lockable_table;
-- VACUUM tests