Simplify marking a sequence list as distributed

pg_dist_object-metadatasync
Jelte Fennema 2021-07-22 11:26:01 +02:00
parent f9a6d11e08
commit f3b7e53734
4 changed files with 28 additions and 37 deletions

View File

@ -1063,16 +1063,7 @@ FinalizeCitusLocalTableCreation(Oid relationId, List *dependentSequenceList)
PropagateSequenceListDependencies(dependentSequenceList); PropagateSequenceListDependencies(dependentSequenceList);
} }
CreateTableMetadataOnWorkers(relationId); CreateTableMetadataOnWorkers(relationId);
MarkSequenceListDistributed(dependentSequenceList);
Oid sequenceOid;
foreach_oid(sequenceOid, dependentSequenceList)
{
ObjectAddress address;
ObjectAddressSet(address, RelationRelationId, sequenceOid);
bool shouldSyncMetadata = true;
MarkObjectDistributed(&address, shouldSyncMetadata);
}
} }
/* /*

View File

@ -557,15 +557,7 @@ CreateDistributedTable(Oid relationId, Var *distributionColumn, char distributio
{ {
if (ClusterHasKnownMetadataWorkers()) if (ClusterHasKnownMetadataWorkers())
{ {
Oid sequenceOid; MarkSequenceListDistributed(dependentSequenceList);
foreach_oid(sequenceOid, dependentSequenceList)
{
ObjectAddress address;
ObjectAddressSet(address, RelationRelationId, sequenceOid);
bool shouldSyncMetadata = true;
MarkObjectDistributed(&address, shouldSyncMetadata);
}
} }
} }
@ -695,6 +687,25 @@ PropagateSequenceDependencies(Oid sequenceOid)
} }
/*
* MarkSequenceListDistributed marks all sequences in the list as distributed.
* NOTE: The sequences should have already been created on the worker nodes.
*/
void
MarkSequenceListDistributed(List *sequenceList)
{
Oid sequenceOid = InvalidOid;
foreach_oid(sequenceOid, sequenceList)
{
ObjectAddress address;
ObjectAddressSet(address, RelationRelationId, sequenceOid);
bool shouldSyncMetadata = true;
MarkObjectDistributed(&address, shouldSyncMetadata);
}
}
/* /*
* EnsureDistributedSequencesHaveOneType first ensures that the type of the column * EnsureDistributedSequencesHaveOneType first ensures that the type of the column
* in which the sequence is used as default is supported for each sequence in input * in which the sequence is used as default is supported for each sequence in input

View File

@ -1600,7 +1600,7 @@ PostprocessAlterTableStmt(AlterTableStmt *alterTableStatement)
EnsureDependenciesExistOnAllNodes(&tableAddress); EnsureDependenciesExistOnAllNodes(&tableAddress);
} }
List *newDistributedObjects = NIL; List *newDependentSequences = NIL;
List *commandList = alterTableStatement->cmds; List *commandList = alterTableStatement->cmds;
AlterTableCmd *command = NULL; AlterTableCmd *command = NULL;
@ -1692,12 +1692,8 @@ PostprocessAlterTableStmt(AlterTableStmt *alterTableStatement)
{ {
PropagateSequenceDependencies( PropagateSequenceDependencies(
seqOid); seqOid);
ObjectAddress *sequenceAddress = palloc( newDependentSequences = lappend_oid(
sizeof(ObjectAddress)); newDependentSequences, seqOid);
ObjectAddressSet(*sequenceAddress, RelationRelationId,
seqOid);
newDistributedObjects = lappend(newDistributedObjects,
sequenceAddress);
} }
} }
} }
@ -1730,10 +1726,8 @@ PostprocessAlterTableStmt(AlterTableStmt *alterTableStatement)
ClusterHasKnownMetadataWorkers()) ClusterHasKnownMetadataWorkers())
{ {
PropagateSequenceDependencies(seqOid); PropagateSequenceDependencies(seqOid);
ObjectAddress *sequenceAddress = palloc(sizeof(ObjectAddress)); newDependentSequences = lappend_oid(newDependentSequences,
ObjectAddressSet(*sequenceAddress, RelationRelationId, seqOid); seqOid);
newDistributedObjects = lappend(newDistributedObjects,
sequenceAddress);
} }
} }
} }
@ -1760,13 +1754,7 @@ PostprocessAlterTableStmt(AlterTableStmt *alterTableStatement)
} }
SendCommandToWorkersWithMetadata(ENABLE_DDL_PROPAGATION); SendCommandToWorkersWithMetadata(ENABLE_DDL_PROPAGATION);
MarkSequenceListDistributed(newDependentSequences);
ObjectAddress *address;
foreach_ptr(address, newDistributedObjects)
{
bool shouldSyncMetadata = true;
MarkObjectDistributed(address, shouldSyncMetadata);
}
} }
} }

View File

@ -295,6 +295,7 @@ extern void EnsureSequenceTypeSupported(Oid seqOid, Oid seqTypId);
extern void AlterSequenceType(Oid seqOid, Oid typeOid); extern void AlterSequenceType(Oid seqOid, Oid typeOid);
extern void PropagateSequenceListDependencies(List *sequenceList); extern void PropagateSequenceListDependencies(List *sequenceList);
extern void PropagateSequenceDependencies(Oid sequenceOid); extern void PropagateSequenceDependencies(Oid sequenceOid);
extern void MarkSequenceListDistributed(List *sequenceList);
extern void EnsureDistributedSequencesHaveOneType(Oid relationId, extern void EnsureDistributedSequencesHaveOneType(Oid relationId,
List *dependentSequenceList, List *dependentSequenceList,
List *attnumList); List *attnumList);