mirror of https://github.com/citusdata/citus.git
Change order
parent
fd0166c11d
commit
921fe6b6b6
|
@ -95,10 +95,20 @@ EnsureReferenceTablesExistOnAllNodes(void)
|
||||||
void
|
void
|
||||||
EnsureReferenceTablesExistOnAllNodesExtended(char transferMode)
|
EnsureReferenceTablesExistOnAllNodesExtended(char transferMode)
|
||||||
{
|
{
|
||||||
List *referenceTableIdList = NIL;
|
List *referenceTableIdList = CitusTableTypeIdList(REFERENCE_TABLE);
|
||||||
uint64 shardId = INVALID_SHARD_ID;
|
Oid referenceTableId = list_length(referenceTableIdList) > 0 ? linitial_oid(
|
||||||
|
referenceTableIdList) : InvalidOid;
|
||||||
|
|
||||||
|
uint64 shardId = referenceTableId == InvalidOid ? InvalidOid : GetFirstShardId(
|
||||||
|
referenceTableId);
|
||||||
List *newWorkersList = NIL;
|
List *newWorkersList = NIL;
|
||||||
const char *referenceTableName = NULL;
|
const char *referenceTableName = NULL;
|
||||||
|
|
||||||
|
if (referenceTableId == InvalidOid)
|
||||||
|
{
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
int colocationId = CreateReferenceTableColocationId();
|
int colocationId = CreateReferenceTableColocationId();
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
@ -122,44 +132,27 @@ EnsureReferenceTablesExistOnAllNodesExtended(char transferMode)
|
||||||
LOCKMODE lockmodes[] = { ShareLock, ExclusiveLock };
|
LOCKMODE lockmodes[] = { ShareLock, ExclusiveLock };
|
||||||
for (int l = 0; l < lengthof(lockmodes); l++)
|
for (int l = 0; l < lengthof(lockmodes); l++)
|
||||||
{
|
{
|
||||||
LockColocationId(colocationId, lockmodes[l]);
|
|
||||||
|
|
||||||
referenceTableIdList = CitusTableTypeIdList(REFERENCE_TABLE);
|
|
||||||
if (referenceTableIdList == NIL)
|
|
||||||
{
|
|
||||||
/* no reference tables exist */
|
|
||||||
for (int ll = l; ll >= 0; ll--)
|
|
||||||
{
|
|
||||||
UnlockColocationId(colocationId, lockmodes[ll]);
|
|
||||||
}
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
Oid referenceTableId = linitial_oid(referenceTableIdList);
|
|
||||||
referenceTableName = get_rel_name(referenceTableId);
|
|
||||||
List *shardIntervalList = LoadShardIntervalList(referenceTableId);
|
|
||||||
if (list_length(shardIntervalList) == 0)
|
|
||||||
{
|
|
||||||
/* check for corrupt metadata */
|
|
||||||
ereport(ERROR, (errmsg("reference table \"%s\" does not have a shard",
|
|
||||||
referenceTableName)));
|
|
||||||
}
|
|
||||||
|
|
||||||
ShardInterval *shardInterval = (ShardInterval *) linitial(shardIntervalList);
|
|
||||||
shardId = shardInterval->shardId;
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* We only take an access share lock, otherwise we'll hold up citus_add_node.
|
* We only take an access share lock, otherwise we'll hold up citus_add_node.
|
||||||
* In case of create_reference_table() where we don't want concurrent writes
|
* In case of create_reference_table() where we don't want concurrent writes
|
||||||
* to pg_dist_node, we have already acquired ShareLock on pg_dist_node.
|
* to pg_dist_node, we have already acquired ShareLock on pg_dist_node.
|
||||||
*/
|
*/
|
||||||
newWorkersList = WorkersWithoutReferenceTablePlacement(shardId, ShareLock);
|
newWorkersList = WorkersWithoutReferenceTablePlacement(shardId, AccessShareLock);
|
||||||
if (list_length(newWorkersList) == 0)
|
if (list_length(newWorkersList) == 0)
|
||||||
{
|
{
|
||||||
/*
|
/*
|
||||||
* nothing to do, no need for lock, however we need to release all earlier
|
* nothing to do, no need for lock, however we need to release all earlier
|
||||||
* locks as well.
|
* locks as well.
|
||||||
*/
|
*/
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
LockColocationId(colocationId, lockmodes[l]);
|
||||||
|
|
||||||
|
referenceTableIdList = CitusTableTypeIdList(REFERENCE_TABLE);
|
||||||
|
if (referenceTableIdList == NIL)
|
||||||
|
{
|
||||||
|
/* no reference tables exist */
|
||||||
for (int ll = l; ll >= 0; ll--)
|
for (int ll = l; ll >= 0; ll--)
|
||||||
{
|
{
|
||||||
UnlockColocationId(colocationId, lockmodes[ll]);
|
UnlockColocationId(colocationId, lockmodes[ll]);
|
||||||
|
|
Loading…
Reference in New Issue