mirror of https://github.com/citusdata/citus.git
revisit locking of colocated table creation, specifically to allow multiple reference tables to be created and dropped in parallel
parent
43c2a1e88b
commit
5f73e44d7c
|
@ -59,6 +59,7 @@
|
||||||
#include "distributed/reference_table_utils.h"
|
#include "distributed/reference_table_utils.h"
|
||||||
#include "distributed/relation_access_tracking.h"
|
#include "distributed/relation_access_tracking.h"
|
||||||
#include "distributed/remote_commands.h"
|
#include "distributed/remote_commands.h"
|
||||||
|
#include "distributed/resource_lock.h"
|
||||||
#include "distributed/shared_library_init.h"
|
#include "distributed/shared_library_init.h"
|
||||||
#include "distributed/shard_rebalancer.h"
|
#include "distributed/shard_rebalancer.h"
|
||||||
#include "distributed/worker_protocol.h"
|
#include "distributed/worker_protocol.h"
|
||||||
|
@ -471,9 +472,22 @@ CreateDistributedTable(Oid relationId, char *distributionColumnName,
|
||||||
/*
|
/*
|
||||||
* Make sure that existing reference tables have been replicated to all the nodes
|
* Make sure that existing reference tables have been replicated to all the nodes
|
||||||
* such that we can create foreign keys and joins work immediately after creation.
|
* such that we can create foreign keys and joins work immediately after creation.
|
||||||
|
*
|
||||||
|
* This will take a lock on the nodes to make sure no nodes are added after we have
|
||||||
|
* verified and ensured the reference tables are copied everywhere.
|
||||||
|
* Although copying reference tables here for anything but creating a new colocation
|
||||||
|
* group, it requires significant refactoring which we don't want to perform now.
|
||||||
*/
|
*/
|
||||||
EnsureReferenceTablesExistOnAllNodes();
|
EnsureReferenceTablesExistOnAllNodes();
|
||||||
|
|
||||||
|
/*
|
||||||
|
* While adding tables to a colocation group we need to make sure no concurrent
|
||||||
|
* mutations happen on the colocation group with regards to its placements. It is
|
||||||
|
* important that we have already copied any reference tables before acquiring this
|
||||||
|
* lock as these are competing operations.
|
||||||
|
*/
|
||||||
|
LockColocationId(colocationId, ShareLock);
|
||||||
|
|
||||||
/* we need to calculate these variables before creating distributed metadata */
|
/* we need to calculate these variables before creating distributed metadata */
|
||||||
bool localTableEmpty = TableEmpty(relationId);
|
bool localTableEmpty = TableEmpty(relationId);
|
||||||
Oid colocatedTableId = ColocatedTableId(colocationId);
|
Oid colocatedTableId = ColocatedTableId(colocationId);
|
||||||
|
@ -513,7 +527,7 @@ CreateDistributedTable(Oid relationId, char *distributionColumnName,
|
||||||
* This function does not expect to create Citus local table, so we blindly
|
* This function does not expect to create Citus local table, so we blindly
|
||||||
* create reference table when the method is DISTRIBUTE_BY_NONE.
|
* create reference table when the method is DISTRIBUTE_BY_NONE.
|
||||||
*/
|
*/
|
||||||
CreateReferenceTableShard(relationId);
|
CreateReferenceTableShard(relationId, colocatedTableId);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (ShouldSyncTableMetadata(relationId))
|
if (ShouldSyncTableMetadata(relationId))
|
||||||
|
|
|
@ -153,12 +153,12 @@ PreprocessDropTableStmt(Node *node, const char *queryString,
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (IsCitusTableType(relationId, REFERENCE_TABLE))
|
/*
|
||||||
{
|
* While changing the tables that are part of a colocation group we need to
|
||||||
/* prevent concurrent EnsureReferenceTablesExistOnAllNodes */
|
* prevent concurrent mutations to the placements of the shard groups.
|
||||||
int colocationId = CreateReferenceTableColocationId();
|
*/
|
||||||
LockColocationId(colocationId, ExclusiveLock);
|
CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry(relationId);
|
||||||
}
|
LockColocationId(cacheEntry->colocationId, ShareLock);
|
||||||
|
|
||||||
/* invalidate foreign key cache if the table involved in any foreign key */
|
/* invalidate foreign key cache if the table involved in any foreign key */
|
||||||
if ((TableReferenced(relationId) || TableReferencing(relationId)))
|
if ((TableReferenced(relationId) || TableReferencing(relationId)))
|
||||||
|
|
|
@ -326,7 +326,7 @@ CreateColocatedShards(Oid targetRelationId, Oid sourceRelationId, bool
|
||||||
* Also, the shard is replicated to the all active nodes in the cluster.
|
* Also, the shard is replicated to the all active nodes in the cluster.
|
||||||
*/
|
*/
|
||||||
void
|
void
|
||||||
CreateReferenceTableShard(Oid distributedTableId)
|
CreateReferenceTableShard(Oid distributedTableId, Oid colocatedTableId)
|
||||||
{
|
{
|
||||||
int workerStartIndex = 0;
|
int workerStartIndex = 0;
|
||||||
text *shardMinValue = NULL;
|
text *shardMinValue = NULL;
|
||||||
|
@ -365,7 +365,7 @@ CreateReferenceTableShard(Oid distributedTableId)
|
||||||
List *nodeList = ReferenceTablePlacementNodeList(ShareLock);
|
List *nodeList = ReferenceTablePlacementNodeList(ShareLock);
|
||||||
nodeList = SortList(nodeList, CompareWorkerNodes);
|
nodeList = SortList(nodeList, CompareWorkerNodes);
|
||||||
|
|
||||||
int replicationFactor = ReferenceTableReplicationFactor();
|
int replicationFactor = list_length(nodeList);
|
||||||
|
|
||||||
/* get the next shard id */
|
/* get the next shard id */
|
||||||
uint64 shardId = GetNextShardId();
|
uint64 shardId = GetNextShardId();
|
||||||
|
|
|
@ -95,56 +95,83 @@ EnsureReferenceTablesExistOnAllNodes(void)
|
||||||
void
|
void
|
||||||
EnsureReferenceTablesExistOnAllNodesExtended(char transferMode)
|
EnsureReferenceTablesExistOnAllNodesExtended(char transferMode)
|
||||||
{
|
{
|
||||||
/*
|
List *referenceTableIdList = NIL;
|
||||||
* Prevent this function from running concurrently with itself.
|
uint64 shardId = INVALID_SHARD_ID;
|
||||||
*
|
List *newWorkersList = NIL;
|
||||||
* It also prevents concurrent DROP TABLE or DROP SCHEMA. We need this
|
const char *referenceTableName = NULL;
|
||||||
* because through-out this function we assume values in referenceTableIdList
|
int colocationId = GetReferenceTableColocationId();
|
||||||
* are still valid.
|
|
||||||
*
|
|
||||||
* We don't need to handle other kinds of reference table DML/DDL here, since
|
|
||||||
* master_copy_shard_placement gets enough locks for that.
|
|
||||||
*
|
|
||||||
* We also don't need special handling for concurrent create_refernece_table.
|
|
||||||
* Since that will trigger a call to this function from another backend,
|
|
||||||
* which will block until our call is finished.
|
|
||||||
*/
|
|
||||||
int colocationId = CreateReferenceTableColocationId();
|
|
||||||
LockColocationId(colocationId, ExclusiveLock);
|
|
||||||
|
|
||||||
List *referenceTableIdList = CitusTableTypeIdList(REFERENCE_TABLE);
|
if (colocationId == INVALID_COLOCATION_ID)
|
||||||
if (referenceTableIdList == NIL)
|
|
||||||
{
|
{
|
||||||
/* no reference tables exist */
|
/* no colocation for reference tables available */
|
||||||
UnlockColocationId(colocationId, ExclusiveLock);
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
Oid referenceTableId = linitial_oid(referenceTableIdList);
|
|
||||||
const char *referenceTableName = get_rel_name(referenceTableId);
|
|
||||||
List *shardIntervalList = LoadShardIntervalList(referenceTableId);
|
|
||||||
if (list_length(shardIntervalList) == 0)
|
|
||||||
{
|
|
||||||
/* check for corrupt metadata */
|
|
||||||
ereport(ERROR, (errmsg("reference table \"%s\" does not have a shard",
|
|
||||||
referenceTableName)));
|
|
||||||
}
|
|
||||||
|
|
||||||
ShardInterval *shardInterval = (ShardInterval *) linitial(shardIntervalList);
|
|
||||||
uint64 shardId = shardInterval->shardId;
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* We only take an access share lock, otherwise we'll hold up citus_add_node.
|
* Most of the time this function should result in a conclusion where we do not need
|
||||||
* In case of create_reference_table() where we don't want concurrent writes
|
* to copy any reference tables. To prevent excessive locking the majority of the time
|
||||||
* to pg_dist_node, we have already acquired ShareLock on pg_dist_node.
|
* we run our precondition checks first with a lower lock. If, after checking with the
|
||||||
|
* lower lock, that we might need to copy reference tables we check with a more
|
||||||
|
* aggressive and self conflicting lock. It is important to be self conflicting in the
|
||||||
|
* second run to make sure that two concurrent calls to this routine will actually not
|
||||||
|
* run concurrently after the initial check. That is also the reason why we release
|
||||||
|
* the lock between the two iterations of precondition checks.
|
||||||
|
*
|
||||||
|
* If after two iterations of precondition checks we still find the need for copying
|
||||||
|
* reference tables we exit the forloop with the last lock held. This will prevent
|
||||||
|
* concurrent DROP TABLE and create_reference_table calls so that the list of
|
||||||
|
* reference tables we operate on are stable.
|
||||||
|
*
|
||||||
|
* Since the changes to the reference table placements are made via loopback
|
||||||
|
* connections we release the final lock held at the end of this function.
|
||||||
*/
|
*/
|
||||||
List *newWorkersList = WorkersWithoutReferenceTablePlacement(shardId,
|
LOCKMODE lockmodes[] = { AccessShareLock, ExclusiveLock };
|
||||||
AccessShareLock);
|
for (int l = 0; l < lengthof(lockmodes); l++)
|
||||||
if (list_length(newWorkersList) == 0)
|
|
||||||
{
|
{
|
||||||
/* nothing to do, no need for lock */
|
LockColocationId(colocationId, lockmodes[l]);
|
||||||
UnlockColocationId(colocationId, ExclusiveLock);
|
|
||||||
return;
|
referenceTableIdList = CitusTableTypeIdList(REFERENCE_TABLE);
|
||||||
|
if (referenceTableIdList == NIL)
|
||||||
|
{
|
||||||
|
/* no reference tables exist */
|
||||||
|
for (int ll = l; ll >= 0; ll--)
|
||||||
|
{
|
||||||
|
UnlockColocationId(colocationId, lockmodes[ll]);
|
||||||
|
}
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
Oid referenceTableId = linitial_oid(referenceTableIdList);
|
||||||
|
referenceTableName = get_rel_name(referenceTableId);
|
||||||
|
List *shardIntervalList = LoadShardIntervalList(referenceTableId);
|
||||||
|
if (list_length(shardIntervalList) == 0)
|
||||||
|
{
|
||||||
|
/* check for corrupt metadata */
|
||||||
|
ereport(ERROR, (errmsg("reference table \"%s\" does not have a shard",
|
||||||
|
referenceTableName)));
|
||||||
|
}
|
||||||
|
|
||||||
|
ShardInterval *shardInterval = (ShardInterval *) linitial(shardIntervalList);
|
||||||
|
shardId = shardInterval->shardId;
|
||||||
|
|
||||||
|
/*
|
||||||
|
* We only take an access share lock, otherwise we'll hold up citus_add_node.
|
||||||
|
* In case of create_reference_table() where we don't want concurrent writes
|
||||||
|
* to pg_dist_node, we have already acquired ShareLock on pg_dist_node.
|
||||||
|
*/
|
||||||
|
newWorkersList = WorkersWithoutReferenceTablePlacement(shardId, AccessShareLock);
|
||||||
|
if (list_length(newWorkersList) == 0)
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
* nothing to do, no need for lock, however we need to release all earlier
|
||||||
|
* locks as well.
|
||||||
|
*/
|
||||||
|
for (int ll = l; ll >= 0; ll--)
|
||||||
|
{
|
||||||
|
UnlockColocationId(colocationId, lockmodes[ll]);
|
||||||
|
}
|
||||||
|
return;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
@ -233,11 +260,11 @@ EnsureReferenceTablesExistOnAllNodesExtended(char transferMode)
|
||||||
CloseConnection(connection);
|
CloseConnection(connection);
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/* release all the locks we acquired for the above operations */
|
||||||
* Unblock other backends, they will probably observe that there are no
|
for (int ll = lengthof(lockmodes) - 1; ll >= 0; ll--)
|
||||||
* more worker nodes without placements, unless nodes were added concurrently
|
{
|
||||||
*/
|
UnlockColocationId(colocationId, lockmodes[ll]);
|
||||||
UnlockColocationId(colocationId, ExclusiveLock);
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -424,6 +451,28 @@ CreateReferenceTableColocationId()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
uint32
|
||||||
|
GetReferenceTableColocationId()
|
||||||
|
{
|
||||||
|
int shardCount = 1;
|
||||||
|
Oid distributionColumnType = InvalidOid;
|
||||||
|
Oid distributionColumnCollation = InvalidOid;
|
||||||
|
|
||||||
|
/*
|
||||||
|
* We don't maintain replication factor of reference tables anymore and
|
||||||
|
* just use -1 instead. We don't use this value in any places.
|
||||||
|
*/
|
||||||
|
int replicationFactor = -1;
|
||||||
|
|
||||||
|
/* check for existing colocations */
|
||||||
|
uint32 colocationId =
|
||||||
|
ColocationId(shardCount, replicationFactor, distributionColumnType,
|
||||||
|
distributionColumnCollation);
|
||||||
|
|
||||||
|
return colocationId;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* DeleteAllReplicatedTablePlacementsFromNodeGroup function iterates over
|
* DeleteAllReplicatedTablePlacementsFromNodeGroup function iterates over
|
||||||
* list of reference and replicated hash distributed tables and deletes
|
* list of reference and replicated hash distributed tables and deletes
|
||||||
|
@ -528,19 +577,6 @@ CompareOids(const void *leftElement, const void *rightElement)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
|
||||||
* ReferenceTableReplicationFactor returns the replication factor for
|
|
||||||
* reference tables.
|
|
||||||
*/
|
|
||||||
int
|
|
||||||
ReferenceTableReplicationFactor(void)
|
|
||||||
{
|
|
||||||
List *nodeList = ReferenceTablePlacementNodeList(NoLock);
|
|
||||||
int replicationFactor = list_length(nodeList);
|
|
||||||
return replicationFactor;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* ReplicateAllReferenceTablesToNode function finds all reference tables and
|
* ReplicateAllReferenceTablesToNode function finds all reference tables and
|
||||||
* replicates them to the given worker node. It also modifies pg_dist_colocation
|
* replicates them to the given worker node. It also modifies pg_dist_colocation
|
||||||
|
@ -551,6 +587,16 @@ ReferenceTableReplicationFactor(void)
|
||||||
void
|
void
|
||||||
ReplicateAllReferenceTablesToNode(WorkerNode *workerNode)
|
ReplicateAllReferenceTablesToNode(WorkerNode *workerNode)
|
||||||
{
|
{
|
||||||
|
int colocationId = GetReferenceTableColocationId();
|
||||||
|
if (colocationId == INVALID_COLOCATION_ID)
|
||||||
|
{
|
||||||
|
/* no reference tables in system */
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* prevent changes in table set while replicating reference tables */
|
||||||
|
LockColocationId(colocationId, RowExclusiveLock);
|
||||||
|
|
||||||
List *referenceTableList = CitusTableTypeIdList(REFERENCE_TABLE);
|
List *referenceTableList = CitusTableTypeIdList(REFERENCE_TABLE);
|
||||||
|
|
||||||
/* if there is no reference table, we do not need to replicate anything */
|
/* if there is no reference table, we do not need to replicate anything */
|
||||||
|
|
|
@ -249,7 +249,7 @@ extern void CreateShardsWithRoundRobinPolicy(Oid distributedTableId, int32 shard
|
||||||
bool useExclusiveConnections);
|
bool useExclusiveConnections);
|
||||||
extern void CreateColocatedShards(Oid targetRelationId, Oid sourceRelationId,
|
extern void CreateColocatedShards(Oid targetRelationId, Oid sourceRelationId,
|
||||||
bool useExclusiveConnections);
|
bool useExclusiveConnections);
|
||||||
extern void CreateReferenceTableShard(Oid distributedTableId);
|
extern void CreateReferenceTableShard(Oid distributedTableId, Oid colocatedTableId);
|
||||||
extern List * WorkerCreateShardCommandList(Oid relationId, int shardIndex, uint64 shardId,
|
extern List * WorkerCreateShardCommandList(Oid relationId, int shardIndex, uint64 shardId,
|
||||||
List *ddlCommandList,
|
List *ddlCommandList,
|
||||||
List *foreignConstraintCommandList);
|
List *foreignConstraintCommandList);
|
||||||
|
|
|
@ -21,10 +21,10 @@
|
||||||
extern void EnsureReferenceTablesExistOnAllNodes(void);
|
extern void EnsureReferenceTablesExistOnAllNodes(void);
|
||||||
extern void EnsureReferenceTablesExistOnAllNodesExtended(char transferMode);
|
extern void EnsureReferenceTablesExistOnAllNodesExtended(char transferMode);
|
||||||
extern uint32 CreateReferenceTableColocationId(void);
|
extern uint32 CreateReferenceTableColocationId(void);
|
||||||
|
extern uint32 GetReferenceTableColocationId(void);
|
||||||
extern void DeleteAllReplicatedTablePlacementsFromNodeGroup(int32 groupId,
|
extern void DeleteAllReplicatedTablePlacementsFromNodeGroup(int32 groupId,
|
||||||
bool localOnly);
|
bool localOnly);
|
||||||
extern int CompareOids(const void *leftElement, const void *rightElement);
|
extern int CompareOids(const void *leftElement, const void *rightElement);
|
||||||
extern int ReferenceTableReplicationFactor(void);
|
|
||||||
extern void ReplicateAllReferenceTablesToNode(WorkerNode *workerNode);
|
extern void ReplicateAllReferenceTablesToNode(WorkerNode *workerNode);
|
||||||
|
|
||||||
#endif /* REFERENCE_TABLE_UTILS_H_ */
|
#endif /* REFERENCE_TABLE_UTILS_H_ */
|
||||||
|
|
Loading…
Reference in New Issue