From e76281500cc646125a82efa34577987ba53b9276 Mon Sep 17 00:00:00 2001 From: Jelte Fennema Date: Thu, 16 Jan 2020 13:14:01 +0100 Subject: [PATCH] Replace shardId lock with lock on colocation+shardIntervalIndex (#3374) This new locking pattern makes sure that some deadlocks that could happend during rebalancing cannot occur anymore. --- .../distributed/master/master_create_shards.c | 16 -- src/backend/distributed/utils/resource_lock.c | 17 +- src/include/distributed/resource_lock.h | 11 +- .../regress/expected/multi_partitioning.out | 158 ++++++++++-------- src/test/regress/expected/multi_utilities.out | 48 ++++-- src/test/regress/sql/multi_partitioning.sql | 67 ++++---- src/test/regress/sql/multi_utilities.sql | 25 ++- 7 files changed, 193 insertions(+), 149 deletions(-) diff --git a/src/backend/distributed/master/master_create_shards.c b/src/backend/distributed/master/master_create_shards.c index 00429bd14..01037ff08 100644 --- a/src/backend/distributed/master/master_create_shards.c +++ b/src/backend/distributed/master/master_create_shards.c @@ -212,14 +212,6 @@ CreateShardsWithRoundRobinPolicy(Oid distributedTableId, int32 shardCount, text *minHashTokenText = IntegerToText(shardMinHashToken); text *maxHashTokenText = IntegerToText(shardMaxHashToken); - /* - * Grabbing the shard metadata lock isn't technically necessary since - * we already hold an exclusive lock on the partition table, but we'll - * acquire it for the sake of completeness. As we're adding new active - * placements, the mode must be exclusive. - */ - LockShardDistributionMetadata(shardId, ExclusiveLock); - InsertShardRow(distributedTableId, shardId, shardStorageType, minHashTokenText, maxHashTokenText); @@ -378,14 +370,6 @@ CreateReferenceTableShard(Oid distributedTableId) /* get the next shard id */ uint64 shardId = GetNextShardId(); - /* - * Grabbing the shard metadata lock isn't technically necessary since - * we already hold an exclusive lock on the partition table, but we'll - * acquire it for the sake of completeness. As we're adding new active - * placements, the mode must be exclusive. - */ - LockShardDistributionMetadata(shardId, ExclusiveLock); - InsertShardRow(distributedTableId, shardId, shardStorageType, shardMinValue, shardMaxValue); diff --git a/src/backend/distributed/utils/resource_lock.c b/src/backend/distributed/utils/resource_lock.c index bbaba01ff..4aa6ec19a 100644 --- a/src/backend/distributed/utils/resource_lock.c +++ b/src/backend/distributed/utils/resource_lock.c @@ -332,7 +332,22 @@ LockShardDistributionMetadata(int64 shardId, LOCKMODE lockMode) const bool sessionLock = false; const bool dontWait = false; - SET_LOCKTAG_SHARD_METADATA_RESOURCE(tag, MyDatabaseId, shardId); + ShardInterval *shardInterval = LoadShardInterval(shardId); + Oid distributedTableId = shardInterval->relationId; + DistTableCacheEntry *distributedTable = DistributedTableCacheEntry( + distributedTableId); + uint32 colocationId = distributedTable->colocationId; + + if (colocationId == INVALID_COLOCATION_ID || + distributedTable->partitionMethod != DISTRIBUTE_BY_HASH) + { + SET_LOCKTAG_SHARD_METADATA_RESOURCE(tag, MyDatabaseId, shardId); + } + else + { + SET_LOCKTAG_COLOCATED_SHARDS_METADATA_RESOURCE(tag, MyDatabaseId, colocationId, + shardInterval->shardIndex); + } (void) LockAcquire(&tag, lockMode, sessionLock, dontWait); } diff --git a/src/include/distributed/resource_lock.h b/src/include/distributed/resource_lock.h index e0ab4db7d..25f50f0c8 100644 --- a/src/include/distributed/resource_lock.h +++ b/src/include/distributed/resource_lock.h @@ -36,7 +36,8 @@ typedef enum AdvisoryLocktagClass ADV_LOCKTAG_CLASS_CITUS_SHARD_METADATA = 4, ADV_LOCKTAG_CLASS_CITUS_SHARD = 5, ADV_LOCKTAG_CLASS_CITUS_JOB = 6, - ADV_LOCKTAG_CLASS_CITUS_REBALANCE_COLOCATION = 7 + ADV_LOCKTAG_CLASS_CITUS_REBALANCE_COLOCATION = 7, + ADV_LOCKTAG_CLASS_CITUS_COLOCATED_SHARDS_METADATA = 8, } AdvisoryLocktagClass; @@ -48,6 +49,14 @@ typedef enum AdvisoryLocktagClass (uint32) (shardid), \ ADV_LOCKTAG_CLASS_CITUS_SHARD_METADATA) +#define SET_LOCKTAG_COLOCATED_SHARDS_METADATA_RESOURCE(tag, db, colocationId, \ + shardIntervalIndex) \ + SET_LOCKTAG_ADVISORY(tag, \ + db, \ + (uint32) shardIntervalIndex, \ + (uint32) colocationId, \ + ADV_LOCKTAG_CLASS_CITUS_COLOCATED_SHARDS_METADATA) + /* reuse advisory lock, but with different, unused field 4 (5)*/ #define SET_LOCKTAG_SHARD_RESOURCE(tag, db, shardid) \ SET_LOCKTAG_ADVISORY(tag, \ diff --git a/src/test/regress/expected/multi_partitioning.out b/src/test/regress/expected/multi_partitioning.out index 26db24c0b..675ee4301 100644 --- a/src/test/regress/expected/multi_partitioning.out +++ b/src/test/regress/expected/multi_partitioning.out @@ -1424,98 +1424,109 @@ SELECT relation::regclass, locktype, mode FROM pg_locks WHERE relation::regclass (8 rows) COMMIT; +CREATE VIEW lockinfo AS + SELECT + logicalrelid, + CASE + WHEN l.objsubid = 5 THEN 'shard' + WHEN l.objsubid = 4 THEN 'shard_metadata' + ELSE 'colocated_shards_metadata' + END AS locktype, + mode + FROM + pg_locks AS l JOIN (select row_number() over (partition by logicalrelid order by shardminvalue) -1 as shardintervalindex, * from pg_dist_shard) AS s + ON + (l.objsubid IN (4, 5) AND l.objid = s.shardid ) + OR (l.objsubid = 8 + AND l.objid IN (select colocationid from pg_dist_partition AS p where p.logicalrelid = s.logicalrelid) + AND l.classid = shardintervalindex + ) + WHERE + logicalrelid IN ('partitioning_locks', 'partitioning_locks_2009', 'partitioning_locks_2010') + AND pid = pg_backend_pid() + AND l.locktype = 'advisory' + ORDER BY + 1, 2, 3; -- test shard resource locks with multi-shard UPDATE BEGIN; UPDATE partitioning_locks_2009 SET time = '2009-03-01'; -- see the locks on parent table -SELECT - logicalrelid, - locktype, - mode -FROM - pg_locks AS l JOIN pg_dist_shard AS s -ON - l.objid = s.shardid -WHERE - logicalrelid IN ('partitioning_locks', 'partitioning_locks_2009', 'partitioning_locks_2010') AND - pid = pg_backend_pid() -ORDER BY - 1, 2, 3; - logicalrelid | locktype | mode +SELECT * FROM lockinfo; + logicalrelid | locktype | mode --------------------------------------------------------------------- - partitioning_locks | advisory | ShareUpdateExclusiveLock - partitioning_locks | advisory | ShareUpdateExclusiveLock - partitioning_locks | advisory | ShareUpdateExclusiveLock - partitioning_locks | advisory | ShareUpdateExclusiveLock - partitioning_locks_2009 | advisory | ShareLock - partitioning_locks_2009 | advisory | ShareLock - partitioning_locks_2009 | advisory | ShareLock - partitioning_locks_2009 | advisory | ShareLock - partitioning_locks_2009 | advisory | ShareUpdateExclusiveLock - partitioning_locks_2009 | advisory | ShareUpdateExclusiveLock - partitioning_locks_2009 | advisory | ShareUpdateExclusiveLock - partitioning_locks_2009 | advisory | ShareUpdateExclusiveLock -(12 rows) + partitioning_locks | colocated_shards_metadata | ShareLock + partitioning_locks | colocated_shards_metadata | ShareLock + partitioning_locks | colocated_shards_metadata | ShareLock + partitioning_locks | colocated_shards_metadata | ShareLock + partitioning_locks | shard | ShareUpdateExclusiveLock + partitioning_locks | shard | ShareUpdateExclusiveLock + partitioning_locks | shard | ShareUpdateExclusiveLock + partitioning_locks | shard | ShareUpdateExclusiveLock + partitioning_locks_2009 | colocated_shards_metadata | ShareLock + partitioning_locks_2009 | colocated_shards_metadata | ShareLock + partitioning_locks_2009 | colocated_shards_metadata | ShareLock + partitioning_locks_2009 | colocated_shards_metadata | ShareLock + partitioning_locks_2009 | shard | ShareUpdateExclusiveLock + partitioning_locks_2009 | shard | ShareUpdateExclusiveLock + partitioning_locks_2009 | shard | ShareUpdateExclusiveLock + partitioning_locks_2009 | shard | ShareUpdateExclusiveLock + partitioning_locks_2010 | colocated_shards_metadata | ShareLock + partitioning_locks_2010 | colocated_shards_metadata | ShareLock + partitioning_locks_2010 | colocated_shards_metadata | ShareLock + partitioning_locks_2010 | colocated_shards_metadata | ShareLock +(20 rows) COMMIT; -- test shard resource locks with TRUNCATE BEGIN; TRUNCATE partitioning_locks_2009; -- see the locks on parent table -SELECT - logicalrelid, - locktype, - mode -FROM - pg_locks AS l JOIN pg_dist_shard AS s -ON - l.objid = s.shardid -WHERE - logicalrelid IN ('partitioning_locks', 'partitioning_locks_2009', 'partitioning_locks_2010') AND - pid = pg_backend_pid() -ORDER BY - 1, 2, 3; - logicalrelid | locktype | mode +SELECT * FROM lockinfo; + logicalrelid | locktype | mode --------------------------------------------------------------------- - partitioning_locks_2009 | advisory | ShareLock - partitioning_locks_2009 | advisory | ShareLock - partitioning_locks_2009 | advisory | ShareLock - partitioning_locks_2009 | advisory | ShareLock -(4 rows) + partitioning_locks | colocated_shards_metadata | ShareLock + partitioning_locks | colocated_shards_metadata | ShareLock + partitioning_locks | colocated_shards_metadata | ShareLock + partitioning_locks | colocated_shards_metadata | ShareLock + partitioning_locks_2009 | colocated_shards_metadata | ShareLock + partitioning_locks_2009 | colocated_shards_metadata | ShareLock + partitioning_locks_2009 | colocated_shards_metadata | ShareLock + partitioning_locks_2009 | colocated_shards_metadata | ShareLock + partitioning_locks_2010 | colocated_shards_metadata | ShareLock + partitioning_locks_2010 | colocated_shards_metadata | ShareLock + partitioning_locks_2010 | colocated_shards_metadata | ShareLock + partitioning_locks_2010 | colocated_shards_metadata | ShareLock +(12 rows) COMMIT; -- test shard resource locks with INSERT/SELECT BEGIN; INSERT INTO partitioning_locks_2009 SELECT * FROM partitioning_locks WHERE time >= '2009-01-01' AND time < '2010-01-01'; -- see the locks on parent table -SELECT - logicalrelid, - locktype, - mode -FROM - pg_locks AS l JOIN pg_dist_shard AS s -ON - l.objid = s.shardid -WHERE - logicalrelid IN ('partitioning_locks', 'partitioning_locks_2009', 'partitioning_locks_2010') AND - pid = pg_backend_pid() -ORDER BY - 1, 2, 3; - logicalrelid | locktype | mode +SELECT * FROM lockinfo; + logicalrelid | locktype | mode --------------------------------------------------------------------- - partitioning_locks | advisory | ShareUpdateExclusiveLock - partitioning_locks | advisory | ShareUpdateExclusiveLock - partitioning_locks | advisory | ShareUpdateExclusiveLock - partitioning_locks | advisory | ShareUpdateExclusiveLock - partitioning_locks_2009 | advisory | ShareLock - partitioning_locks_2009 | advisory | ShareLock - partitioning_locks_2009 | advisory | ShareLock - partitioning_locks_2009 | advisory | ShareLock - partitioning_locks_2009 | advisory | ShareUpdateExclusiveLock - partitioning_locks_2009 | advisory | ShareUpdateExclusiveLock - partitioning_locks_2009 | advisory | ShareUpdateExclusiveLock - partitioning_locks_2009 | advisory | ShareUpdateExclusiveLock -(12 rows) + partitioning_locks | colocated_shards_metadata | ShareLock + partitioning_locks | colocated_shards_metadata | ShareLock + partitioning_locks | colocated_shards_metadata | ShareLock + partitioning_locks | colocated_shards_metadata | ShareLock + partitioning_locks | shard | ShareUpdateExclusiveLock + partitioning_locks | shard | ShareUpdateExclusiveLock + partitioning_locks | shard | ShareUpdateExclusiveLock + partitioning_locks | shard | ShareUpdateExclusiveLock + partitioning_locks_2009 | colocated_shards_metadata | ShareLock + partitioning_locks_2009 | colocated_shards_metadata | ShareLock + partitioning_locks_2009 | colocated_shards_metadata | ShareLock + partitioning_locks_2009 | colocated_shards_metadata | ShareLock + partitioning_locks_2009 | shard | ShareUpdateExclusiveLock + partitioning_locks_2009 | shard | ShareUpdateExclusiveLock + partitioning_locks_2009 | shard | ShareUpdateExclusiveLock + partitioning_locks_2009 | shard | ShareUpdateExclusiveLock + partitioning_locks_2010 | colocated_shards_metadata | ShareLock + partitioning_locks_2010 | colocated_shards_metadata | ShareLock + partitioning_locks_2010 | colocated_shards_metadata | ShareLock + partitioning_locks_2010 | colocated_shards_metadata | ShareLock +(20 rows) COMMIT; -- test partition-wise join @@ -1722,6 +1733,7 @@ SELECT success FROM run_command_on_workers('select pg_reload_conf()'); (2 rows) RESET enable_partitionwise_join; +DROP VIEW lockinfo; DROP TABLE IF EXISTS partitioning_test_2009, diff --git a/src/test/regress/expected/multi_utilities.out b/src/test/regress/expected/multi_utilities.out index aed692172..240b63061 100644 --- a/src/test/regress/expected/multi_utilities.out +++ b/src/test/regress/expected/multi_utilities.out @@ -69,6 +69,18 @@ SELECT master_apply_delete_command('DELETE FROM sharded_table'); ERROR: cannot delete from hash distributed table with this command DETAIL: Delete statements on hash-partitioned tables are not supported with master_apply_delete_command. HINT: Use the DELETE command instead. +SET citus.shard_count TO 4; +SET citus.next_shard_id TO 999001; +ALTER SEQUENCE pg_catalog.pg_dist_colocationid_seq RESTART 1400000; +CREATE TABLE lockable_table ( name text, id bigint ); +SELECT create_distributed_table('lockable_table', 'id', 'hash', colocate_with := 'none'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +SET citus.shard_count TO 2; +SET citus.next_shard_id TO 990002; -- lock shard metadata: take some share locks and exclusive locks BEGIN; SELECT lock_shard_metadata(5, ARRAY[999001, 999002, 999002]); @@ -83,17 +95,26 @@ SELECT lock_shard_metadata(7, ARRAY[999001, 999003, 999004]); (1 row) -SELECT locktype, objid, mode, granted -FROM pg_locks -WHERE objid IN (999001, 999002, 999003, 999004) -ORDER BY objid, mode; - locktype | objid | mode | granted +SELECT + CASE + WHEN l.objsubid = 5 THEN 'shard' + WHEN l.objsubid = 4 THEN 'shard_metadata' + ELSE 'colocated_shards_metadata' + END AS locktype, + objid, + classid, + mode, + granted +FROM pg_locks l +WHERE l.locktype = 'advisory' +ORDER BY locktype, objid, classid, mode; + locktype | objid | classid | mode | granted --------------------------------------------------------------------- - advisory | 999001 | ExclusiveLock | t - advisory | 999001 | ShareLock | t - advisory | 999002 | ShareLock | t - advisory | 999003 | ExclusiveLock | t - advisory | 999004 | ExclusiveLock | t + colocated_shards_metadata | 1400000 | 0 | ExclusiveLock | t + colocated_shards_metadata | 1400000 | 0 | ShareLock | t + colocated_shards_metadata | 1400000 | 1 | ShareLock | t + colocated_shards_metadata | 1400000 | 2 | ExclusiveLock | t + colocated_shards_metadata | 1400000 | 3 | ExclusiveLock | t (5 rows) END; @@ -102,11 +123,7 @@ SELECT lock_shard_metadata(0, ARRAY[990001, 999002]); ERROR: unsupported lockmode 0 -- lock shard metadata: invalid shard ID SELECT lock_shard_metadata(5, ARRAY[0]); - lock_shard_metadata ---------------------------------------------------------------------- - -(1 row) - +ERROR: could not find valid entry for shard xxxxx -- lock shard metadata: lock nothing SELECT lock_shard_metadata(5, ARRAY[]::bigint[]); ERROR: no locks specified @@ -153,6 +170,7 @@ SELECT lock_shard_resources(5, ARRAY[]::bigint[]); ERROR: no locks specified -- drop table DROP TABLE sharded_table; +DROP TABLE lockable_table; -- VACUUM tests -- create a table with a single shard (for convenience) SET citus.shard_count TO 1; diff --git a/src/test/regress/sql/multi_partitioning.sql b/src/test/regress/sql/multi_partitioning.sql index cd8fd3c48..168189f09 100644 --- a/src/test/regress/sql/multi_partitioning.sql +++ b/src/test/regress/sql/multi_partitioning.sql @@ -920,24 +920,36 @@ TRUNCATE partitioning_locks; SELECT relation::regclass, locktype, mode FROM pg_locks WHERE relation::regclass::text LIKE 'partitioning_locks%' AND pid = pg_backend_pid() ORDER BY 1, 2, 3; COMMIT; +CREATE VIEW lockinfo AS + SELECT + logicalrelid, + CASE + WHEN l.objsubid = 5 THEN 'shard' + WHEN l.objsubid = 4 THEN 'shard_metadata' + ELSE 'colocated_shards_metadata' + END AS locktype, + mode + FROM + pg_locks AS l JOIN (select row_number() over (partition by logicalrelid order by shardminvalue) -1 as shardintervalindex, * from pg_dist_shard) AS s + ON + (l.objsubid IN (4, 5) AND l.objid = s.shardid ) + OR (l.objsubid = 8 + AND l.objid IN (select colocationid from pg_dist_partition AS p where p.logicalrelid = s.logicalrelid) + AND l.classid = shardintervalindex + ) + WHERE + logicalrelid IN ('partitioning_locks', 'partitioning_locks_2009', 'partitioning_locks_2010') + AND pid = pg_backend_pid() + AND l.locktype = 'advisory' + ORDER BY + 1, 2, 3; + -- test shard resource locks with multi-shard UPDATE BEGIN; UPDATE partitioning_locks_2009 SET time = '2009-03-01'; -- see the locks on parent table -SELECT - logicalrelid, - locktype, - mode -FROM - pg_locks AS l JOIN pg_dist_shard AS s -ON - l.objid = s.shardid -WHERE - logicalrelid IN ('partitioning_locks', 'partitioning_locks_2009', 'partitioning_locks_2010') AND - pid = pg_backend_pid() -ORDER BY - 1, 2, 3; +SELECT * FROM lockinfo; COMMIT; -- test shard resource locks with TRUNCATE @@ -945,19 +957,7 @@ BEGIN; TRUNCATE partitioning_locks_2009; -- see the locks on parent table -SELECT - logicalrelid, - locktype, - mode -FROM - pg_locks AS l JOIN pg_dist_shard AS s -ON - l.objid = s.shardid -WHERE - logicalrelid IN ('partitioning_locks', 'partitioning_locks_2009', 'partitioning_locks_2010') AND - pid = pg_backend_pid() -ORDER BY - 1, 2, 3; +SELECT * FROM lockinfo; COMMIT; -- test shard resource locks with INSERT/SELECT @@ -965,19 +965,7 @@ BEGIN; INSERT INTO partitioning_locks_2009 SELECT * FROM partitioning_locks WHERE time >= '2009-01-01' AND time < '2010-01-01'; -- see the locks on parent table -SELECT - logicalrelid, - locktype, - mode -FROM - pg_locks AS l JOIN pg_dist_shard AS s -ON - l.objid = s.shardid -WHERE - logicalrelid IN ('partitioning_locks', 'partitioning_locks_2009', 'partitioning_locks_2010') AND - pid = pg_backend_pid() -ORDER BY - 1, 2, 3; +SELECT * FROM lockinfo; COMMIT; -- test partition-wise join @@ -1026,6 +1014,7 @@ SELECT success FROM run_command_on_workers('select pg_reload_conf()'); RESET enable_partitionwise_join; +DROP VIEW lockinfo; DROP TABLE IF EXISTS partitioning_test_2009, diff --git a/src/test/regress/sql/multi_utilities.sql b/src/test/regress/sql/multi_utilities.sql index 38cb1fc7b..caf2a9895 100644 --- a/src/test/regress/sql/multi_utilities.sql +++ b/src/test/regress/sql/multi_utilities.sql @@ -48,16 +48,32 @@ SELECT master_apply_delete_command('DELETE FROM sharded_table WHERE id > 0'); -- drop all shards SELECT master_apply_delete_command('DELETE FROM sharded_table'); +SET citus.shard_count TO 4; +SET citus.next_shard_id TO 999001; +ALTER SEQUENCE pg_catalog.pg_dist_colocationid_seq RESTART 1400000; +CREATE TABLE lockable_table ( name text, id bigint ); +SELECT create_distributed_table('lockable_table', 'id', 'hash', colocate_with := 'none'); +SET citus.shard_count TO 2; +SET citus.next_shard_id TO 990002; -- lock shard metadata: take some share locks and exclusive locks BEGIN; SELECT lock_shard_metadata(5, ARRAY[999001, 999002, 999002]); SELECT lock_shard_metadata(7, ARRAY[999001, 999003, 999004]); -SELECT locktype, objid, mode, granted -FROM pg_locks -WHERE objid IN (999001, 999002, 999003, 999004) -ORDER BY objid, mode; +SELECT + CASE + WHEN l.objsubid = 5 THEN 'shard' + WHEN l.objsubid = 4 THEN 'shard_metadata' + ELSE 'colocated_shards_metadata' + END AS locktype, + objid, + classid, + mode, + granted +FROM pg_locks l +WHERE l.locktype = 'advisory' +ORDER BY locktype, objid, classid, mode; END; -- lock shard metadata: unsupported lock type @@ -91,6 +107,7 @@ SELECT lock_shard_resources(5, ARRAY[]::bigint[]); -- drop table DROP TABLE sharded_table; +DROP TABLE lockable_table; -- VACUUM tests