Fix reference table lock contention (#6173)

DESCRIPTION: Fix reference table lock contention

Dropping and creating reference tables unintentionally blocked on each other due to the use of an ExclusiveLock for both the Drop and conditionally copying existing reference tables to (new) nodes.

The patch does the following:
 - Lower lock lever for dropping (reference) tables to `ShareLock` so they don't self conflict
 - Treat reference tables and distributed tables equally and acquire the colocation lock when dropping any table that is in a colocation group
 - Perform the precondition check for copying reference tables twice, first time with a lower lock that doesn't conflict with anything. Could have been a NoLock, however, in preparation for dropping a colocation group, it is an `AccessShareLock`

During normal operation the first check will always pass and we don't have to escalate that lock. Making it that we won't be blocked on adding and remove reference tables. Only after a node addition the first `create_reference_table` will still need to acquire an `ExclusiveLock` on the colocation group to perform the copy.
pull/6184/head
Nils Dijk 2022-08-17 18:19:28 +02:00 committed by GitHub
parent 0631e1998b
commit a9d47a96f6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 357 additions and 67 deletions

View File

@ -59,6 +59,7 @@
#include "distributed/reference_table_utils.h"
#include "distributed/relation_access_tracking.h"
#include "distributed/remote_commands.h"
#include "distributed/resource_lock.h"
#include "distributed/shared_library_init.h"
#include "distributed/shard_rebalancer.h"
#include "distributed/worker_protocol.h"
@ -471,9 +472,22 @@ CreateDistributedTable(Oid relationId, char *distributionColumnName,
/*
* Make sure that existing reference tables have been replicated to all the nodes
* such that we can create foreign keys and joins work immediately after creation.
*
* This will take a lock on the nodes to make sure no nodes are added after we have
* verified and ensured the reference tables are copied everywhere.
* Although copying reference tables here for anything but creating a new colocation
* group, it requires significant refactoring which we don't want to perform now.
*/
EnsureReferenceTablesExistOnAllNodes();
/*
* While adding tables to a colocation group we need to make sure no concurrent
* mutations happen on the colocation group with regards to its placements. It is
* important that we have already copied any reference tables before acquiring this
* lock as these are competing operations.
*/
LockColocationId(colocationId, ShareLock);
/* we need to calculate these variables before creating distributed metadata */
bool localTableEmpty = TableEmpty(relationId);
Oid colocatedTableId = ColocatedTableId(colocationId);

View File

@ -153,11 +153,14 @@ PreprocessDropTableStmt(Node *node, const char *queryString,
continue;
}
if (IsCitusTableType(relationId, REFERENCE_TABLE))
/*
* While changing the tables that are part of a colocation group we need to
* prevent concurrent mutations to the placements of the shard groups.
*/
CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry(relationId);
if (cacheEntry->colocationId != INVALID_COLOCATION_ID)
{
/* prevent concurrent EnsureReferenceTablesExistOnAllNodes */
int colocationId = CreateReferenceTableColocationId();
LockColocationId(colocationId, ExclusiveLock);
LockColocationId(cacheEntry->colocationId, ShareLock);
}
/* invalidate foreign key cache if the table involved in any foreign key */

View File

@ -365,7 +365,7 @@ CreateReferenceTableShard(Oid distributedTableId)
List *nodeList = ReferenceTablePlacementNodeList(ShareLock);
nodeList = SortList(nodeList, CompareWorkerNodes);
int replicationFactor = ReferenceTableReplicationFactor();
int replicationFactor = list_length(nodeList);
/* get the next shard id */
uint64 shardId = GetNextShardId();

View File

@ -64,8 +64,17 @@ replicate_reference_tables(PG_FUNCTION_ARGS)
{
Oid shardReplicationModeOid = PG_GETARG_OID(0);
char shardReplicationMode = LookupShardTransferMode(shardReplicationModeOid);
/* to prevent concurrent node additions while copying reference tables */
LockRelationOid(DistNodeRelationId(), ShareLock);
EnsureReferenceTablesExistOnAllNodesExtended(shardReplicationMode);
/*
* Given the copying of reference tables and updating metadata have been done via a
* loopback connection we do not have to retain the lock on pg_dist_node anymore.
*/
UnlockRelationOid(DistNodeRelationId(), ShareLock);
PG_RETURN_VOID();
}
@ -95,56 +104,85 @@ EnsureReferenceTablesExistOnAllNodes(void)
void
EnsureReferenceTablesExistOnAllNodesExtended(char transferMode)
{
/*
* Prevent this function from running concurrently with itself.
*
* It also prevents concurrent DROP TABLE or DROP SCHEMA. We need this
* because through-out this function we assume values in referenceTableIdList
* are still valid.
*
* We don't need to handle other kinds of reference table DML/DDL here, since
* master_copy_shard_placement gets enough locks for that.
*
* We also don't need special handling for concurrent create_refernece_table.
* Since that will trigger a call to this function from another backend,
* which will block until our call is finished.
*/
List *referenceTableIdList = NIL;
uint64 shardId = INVALID_SHARD_ID;
List *newWorkersList = NIL;
const char *referenceTableName = NULL;
int colocationId = CreateReferenceTableColocationId();
LockColocationId(colocationId, ExclusiveLock);
List *referenceTableIdList = CitusTableTypeIdList(REFERENCE_TABLE);
if (referenceTableIdList == NIL)
{
/* no reference tables exist */
UnlockColocationId(colocationId, ExclusiveLock);
return;
}
Oid referenceTableId = linitial_oid(referenceTableIdList);
const char *referenceTableName = get_rel_name(referenceTableId);
List *shardIntervalList = LoadShardIntervalList(referenceTableId);
if (list_length(shardIntervalList) == 0)
{
/* check for corrupt metadata */
ereport(ERROR, (errmsg("reference table \"%s\" does not have a shard",
referenceTableName)));
}
ShardInterval *shardInterval = (ShardInterval *) linitial(shardIntervalList);
uint64 shardId = shardInterval->shardId;
/*
* We only take an access share lock, otherwise we'll hold up citus_add_node.
* In case of create_reference_table() where we don't want concurrent writes
* to pg_dist_node, we have already acquired ShareLock on pg_dist_node.
* Most of the time this function should result in a conclusion where we do not need
* to copy any reference tables. To prevent excessive locking the majority of the time
* we run our precondition checks first with a lower lock. If, after checking with the
* lower lock, that we might need to copy reference tables we check with a more
* aggressive and self conflicting lock. It is important to be self conflicting in the
* second run to make sure that two concurrent calls to this routine will actually not
* run concurrently after the initial check.
*
* If after two iterations of precondition checks we still find the need for copying
* reference tables we exit the loop with all locks held. This will prevent concurrent
* DROP TABLE and create_reference_table calls so that the list of reference tables we
* operate on are stable.
*
* Since the changes to the reference table placements are made via loopback
* connections we release the locks held at the end of this function. Due to Citus
* only running transactions in READ COMMITTED mode we can be sure that other
* transactions correctly find the metadata entries.
*/
List *newWorkersList = WorkersWithoutReferenceTablePlacement(shardId,
AccessShareLock);
if (list_length(newWorkersList) == 0)
LOCKMODE lockmodes[] = { AccessShareLock, ExclusiveLock };
for (int lockmodeIndex = 0; lockmodeIndex < lengthof(lockmodes); lockmodeIndex++)
{
/* nothing to do, no need for lock */
UnlockColocationId(colocationId, ExclusiveLock);
return;
LockColocationId(colocationId, lockmodes[lockmodeIndex]);
referenceTableIdList = CitusTableTypeIdList(REFERENCE_TABLE);
if (referenceTableIdList == NIL)
{
/*
* No reference tables exist, make sure that any locks obtained earlier are
* released. It will probably not matter, but we release the locks in the
* reverse order we obtained them in.
*/
for (int releaseLockmodeIndex = lockmodeIndex; releaseLockmodeIndex >= 0;
releaseLockmodeIndex--)
{
UnlockColocationId(colocationId, lockmodes[releaseLockmodeIndex]);
}
return;
}
Oid referenceTableId = linitial_oid(referenceTableIdList);
referenceTableName = get_rel_name(referenceTableId);
List *shardIntervalList = LoadShardIntervalList(referenceTableId);
if (list_length(shardIntervalList) == 0)
{
/* check for corrupt metadata */
ereport(ERROR, (errmsg("reference table \"%s\" does not have a shard",
referenceTableName)));
}
ShardInterval *shardInterval = (ShardInterval *) linitial(shardIntervalList);
shardId = shardInterval->shardId;
/*
* We only take an access share lock, otherwise we'll hold up citus_add_node.
* In case of create_reference_table() where we don't want concurrent writes
* to pg_dist_node, we have already acquired ShareLock on pg_dist_node.
*/
newWorkersList = WorkersWithoutReferenceTablePlacement(shardId, AccessShareLock);
if (list_length(newWorkersList) == 0)
{
/*
* All workers alreaddy have a copy of the reference tables, make sure that
* any locks obtained earlier are released. It will probably not matter, but
* we release the locks in the reverse order we obtained them in.
*/
for (int releaseLockmodeIndex = lockmodeIndex; releaseLockmodeIndex >= 0;
releaseLockmodeIndex--)
{
UnlockColocationId(colocationId, lockmodes[releaseLockmodeIndex]);
}
return;
}
}
/*
@ -234,10 +272,17 @@ EnsureReferenceTablesExistOnAllNodesExtended(char transferMode)
}
/*
* Unblock other backends, they will probably observe that there are no
* more worker nodes without placements, unless nodes were added concurrently
* Since reference tables have been copied via a loopback connection we do not have to
* retain our locks. Since Citus only runs well in READ COMMITTED mode we can be sure
* that other transactions will find the reference tables copied.
* We have obtained and held multiple locks, here we unlock them all in the reverse
* order we have obtained them in.
*/
UnlockColocationId(colocationId, ExclusiveLock);
for (int releaseLockmodeIndex = lengthof(lockmodes) - 1; releaseLockmodeIndex >= 0;
releaseLockmodeIndex--)
{
UnlockColocationId(colocationId, lockmodes[releaseLockmodeIndex]);
}
}
@ -424,6 +469,28 @@ CreateReferenceTableColocationId()
}
uint32
GetReferenceTableColocationId()
{
int shardCount = 1;
Oid distributionColumnType = InvalidOid;
Oid distributionColumnCollation = InvalidOid;
/*
* We don't maintain replication factor of reference tables anymore and
* just use -1 instead. We don't use this value in any places.
*/
int replicationFactor = -1;
/* check for existing colocations */
uint32 colocationId =
ColocationId(shardCount, replicationFactor, distributionColumnType,
distributionColumnCollation);
return colocationId;
}
/*
* DeleteAllReplicatedTablePlacementsFromNodeGroup function iterates over
* list of reference and replicated hash distributed tables and deletes
@ -528,19 +595,6 @@ CompareOids(const void *leftElement, const void *rightElement)
}
/*
* ReferenceTableReplicationFactor returns the replication factor for
* reference tables.
*/
int
ReferenceTableReplicationFactor(void)
{
List *nodeList = ReferenceTablePlacementNodeList(NoLock);
int replicationFactor = list_length(nodeList);
return replicationFactor;
}
/*
* ReplicateAllReferenceTablesToNode function finds all reference tables and
* replicates them to the given worker node. It also modifies pg_dist_colocation
@ -551,6 +605,16 @@ ReferenceTableReplicationFactor(void)
void
ReplicateAllReferenceTablesToNode(WorkerNode *workerNode)
{
int colocationId = GetReferenceTableColocationId();
if (colocationId == INVALID_COLOCATION_ID)
{
/* no reference tables in system */
return;
}
/* prevent changes in table set while replicating reference tables */
LockColocationId(colocationId, RowExclusiveLock);
List *referenceTableList = CitusTableTypeIdList(REFERENCE_TABLE);
/* if there is no reference table, we do not need to replicate anything */

View File

@ -21,10 +21,10 @@
extern void EnsureReferenceTablesExistOnAllNodes(void);
extern void EnsureReferenceTablesExistOnAllNodesExtended(char transferMode);
extern uint32 CreateReferenceTableColocationId(void);
extern uint32 GetReferenceTableColocationId(void);
extern void DeleteAllReplicatedTablePlacementsFromNodeGroup(int32 groupId,
bool localOnly);
extern int CompareOids(const void *leftElement, const void *rightElement);
extern int ReferenceTableReplicationFactor(void);
extern void ReplicateAllReferenceTablesToNode(WorkerNode *workerNode);
#endif /* REFERENCE_TABLE_UTILS_H_ */

View File

@ -0,0 +1,137 @@
Parsed test spec with 2 sessions
starting permutation: s1-begin s1-create s2-create s1-commit
create_reference_table
---------------------------------------------------------------------
(1 row)
step s1-begin:
BEGIN;
step s1-create:
CREATE TABLE reference_table_s1(a int);
SELECT create_reference_table('reference_table_s1');
create_reference_table
---------------------------------------------------------------------
(1 row)
step s2-create:
CREATE TABLE reference_table_s2(a int);
SELECT create_reference_table('reference_table_s2');
create_reference_table
---------------------------------------------------------------------
(1 row)
step s1-commit:
COMMIT;
starting permutation: s1-create s2-create s1-begin s1-drop s2-drop s1-commit
create_reference_table
---------------------------------------------------------------------
(1 row)
step s1-create:
CREATE TABLE reference_table_s1(a int);
SELECT create_reference_table('reference_table_s1');
create_reference_table
---------------------------------------------------------------------
(1 row)
step s2-create:
CREATE TABLE reference_table_s2(a int);
SELECT create_reference_table('reference_table_s2');
create_reference_table
---------------------------------------------------------------------
(1 row)
step s1-begin:
BEGIN;
step s1-drop:
DROP TABLE reference_table_s1;
step s2-drop:
DROP TABLE reference_table_s2;
step s1-commit:
COMMIT;
starting permutation: s1-create s2-begin s2-create s1-drop s2-commit
create_reference_table
---------------------------------------------------------------------
(1 row)
step s1-create:
CREATE TABLE reference_table_s1(a int);
SELECT create_reference_table('reference_table_s1');
create_reference_table
---------------------------------------------------------------------
(1 row)
step s2-begin:
BEGIN;
step s2-create:
CREATE TABLE reference_table_s2(a int);
SELECT create_reference_table('reference_table_s2');
create_reference_table
---------------------------------------------------------------------
(1 row)
step s1-drop:
DROP TABLE reference_table_s1;
step s2-commit:
COMMIT;
starting permutation: s2-create s2-begin s2-drop s1-create s2-commit
create_reference_table
---------------------------------------------------------------------
(1 row)
step s2-create:
CREATE TABLE reference_table_s2(a int);
SELECT create_reference_table('reference_table_s2');
create_reference_table
---------------------------------------------------------------------
(1 row)
step s2-begin:
BEGIN;
step s2-drop:
DROP TABLE reference_table_s2;
step s1-create:
CREATE TABLE reference_table_s1(a int);
SELECT create_reference_table('reference_table_s1');
create_reference_table
---------------------------------------------------------------------
(1 row)
step s2-commit:
COMMIT;

View File

@ -69,6 +69,7 @@ test: isolation_undistribute_table
test: isolation_fix_partition_shard_index_names
test: isolation_global_pid
test: isolation_citus_locks
test: isolation_reference_table
# Rebalancer
test: isolation_blocking_move_single_shard_commands

View File

@ -0,0 +1,71 @@
// reference tables _do_ lock on the first reference table in the shardgroup due to the lack of shardgroups in the
// system. When we run the tests we want to make sure the tables we are testing against cannot be the first reference
// table. For this purpose we create a reference table that we will _not_ interact with during the tests.
setup
{
CREATE TABLE first_reference_table(a int);
SELECT create_reference_table('first_reference_table');
}
teardown
{
DROP TABLE first_reference_table;
DROP TABLE IF EXISTS reference_table_s1;
DROP TABLE IF EXISTS reference_table_s2;
}
session "s1"
step "s1-begin"
{
BEGIN;
}
step "s1-create"
{
CREATE TABLE reference_table_s1(a int);
SELECT create_reference_table('reference_table_s1');
}
step "s1-drop"
{
DROP TABLE reference_table_s1;
}
step "s1-commit"
{
COMMIT;
}
session "s2"
step "s2-begin"
{
BEGIN;
}
step "s2-create"
{
CREATE TABLE reference_table_s2(a int);
SELECT create_reference_table('reference_table_s2');
}
step "s2-drop"
{
DROP TABLE reference_table_s2;
}
step "s2-commit"
{
COMMIT;
}
// creates don't block each other
permutation "s1-begin" "s1-create" "s2-create" "s1-commit"
// drops don't block each other
permutation "s1-create" "s2-create" "s1-begin" "s1-drop" "s2-drop" "s1-commit"
// create and drop don't block each other
permutation "s1-create" "s2-begin" "s2-create" "s1-drop" "s2-commit"
permutation "s2-create" "s2-begin" "s2-drop" "s1-create" "s2-commit"