mirror of https://github.com/citusdata/citus.git
Update sequence dependency creation for alter table
parent
3260bf3513
commit
2458e50a6b
|
@ -113,8 +113,6 @@ static void EnsureLocalTableEmptyIfNecessary(Oid relationId, char distributionMe
|
|||
static bool ShouldLocalTableBeEmpty(Oid relationId, char distributionMethod, bool
|
||||
viaDeprecatedAPI);
|
||||
static void EnsureCitusTableCanBeCreated(Oid relationOid);
|
||||
static void EnsureSequenceExistOnMetadataWorkersForRelation(Oid relationId,
|
||||
Oid sequenceOid);
|
||||
static List * GetFKeyCreationCommandsRelationInvolvedWithTableType(Oid relationId,
|
||||
int tableTypeFlag);
|
||||
static Oid DropFKeysAndUndistributeTable(Oid relationId);
|
||||
|
@ -668,63 +666,6 @@ AlterSequenceType(Oid seqOid, Oid typeOid)
|
|||
}
|
||||
|
||||
|
||||
/*
|
||||
* MarkSequenceListDistributedAndPropagateWithDependencies ensures sequences and their
|
||||
* dependencies for the given sequence list exist on all nodes and marks them as distributed.
|
||||
*/
|
||||
void
|
||||
MarkSequenceListDistributedAndPropagateWithDependencies(Oid relationId,
|
||||
List *sequenceList)
|
||||
{
|
||||
Oid sequenceOid = InvalidOid;
|
||||
foreach_oid(sequenceOid, sequenceList)
|
||||
{
|
||||
MarkSequenceDistributedAndPropagateWithDependencies(relationId, sequenceOid);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* MarkSequenceDistributedAndPropagateWithDependencies ensures sequence and its'
|
||||
* dependencies for the given sequence exist on all nodes and marks them as distributed.
|
||||
*/
|
||||
void
|
||||
MarkSequenceDistributedAndPropagateWithDependencies(Oid relationId, Oid sequenceOid)
|
||||
{
|
||||
/* get sequence address */
|
||||
ObjectAddress sequenceAddress = { 0 };
|
||||
ObjectAddressSet(sequenceAddress, RelationRelationId, sequenceOid);
|
||||
EnsureDependenciesExistOnAllNodes(&sequenceAddress);
|
||||
EnsureSequenceExistOnMetadataWorkersForRelation(relationId, sequenceOid);
|
||||
MarkObjectDistributed(&sequenceAddress);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* EnsureSequenceExistOnMetadataWorkersForRelation ensures sequence for the given relation
|
||||
* exist on each worker node with metadata.
|
||||
*/
|
||||
static void
|
||||
EnsureSequenceExistOnMetadataWorkersForRelation(Oid relationId, Oid sequenceOid)
|
||||
{
|
||||
Assert(ShouldSyncTableMetadata(relationId));
|
||||
|
||||
char *ownerName = TableOwner(relationId);
|
||||
List *sequenceDDLList = DDLCommandsForSequence(sequenceOid, ownerName);
|
||||
|
||||
/* prevent recursive propagation */
|
||||
SendCommandToWorkersWithMetadata(DISABLE_DDL_PROPAGATION);
|
||||
|
||||
const char *sequenceCommand = NULL;
|
||||
foreach_ptr(sequenceCommand, sequenceDDLList)
|
||||
{
|
||||
SendCommandToWorkersWithMetadata(sequenceCommand);
|
||||
}
|
||||
|
||||
SendCommandToWorkersWithMetadata(ENABLE_DDL_PROPAGATION);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* EnsureDistributedSequencesHaveOneType first ensures that the type of the column
|
||||
* in which the sequence is used as default is supported for each sequence in input
|
||||
|
|
|
@ -1955,6 +1955,17 @@ PostprocessAlterTableStmt(AlterTableStmt *alterTableStatement)
|
|||
return;
|
||||
}
|
||||
|
||||
/*
|
||||
* Before ensuring each dependency exist, update dependent sequences
|
||||
* types if necessary.
|
||||
*/
|
||||
List *attnumList = NIL;
|
||||
List *dependentSequenceList = NIL;
|
||||
GetDependentSequencesWithRelation(relationId, &attnumList, &dependentSequenceList,
|
||||
0);
|
||||
EnsureDistributedSequencesHaveOneType(relationId, dependentSequenceList,
|
||||
attnumList);
|
||||
|
||||
/* changing a relation could introduce new dependencies */
|
||||
ObjectAddress tableAddress = { 0 };
|
||||
ObjectAddressSet(tableAddress, RelationRelationId, relationId);
|
||||
|
@ -2045,17 +2056,9 @@ PostprocessAlterTableStmt(AlterTableStmt *alterTableStatement)
|
|||
Oid seqOid = GetSequenceOid(relationId, attnum);
|
||||
if (seqOid != InvalidOid)
|
||||
{
|
||||
EnsureDistributedSequencesHaveOneType(relationId,
|
||||
list_make1_oid(
|
||||
seqOid),
|
||||
list_make1_int(
|
||||
attnum));
|
||||
|
||||
if (ShouldSyncTableMetadata(relationId))
|
||||
{
|
||||
needMetadataSyncForNewSequences = true;
|
||||
MarkSequenceDistributedAndPropagateWithDependencies(
|
||||
relationId, seqOid);
|
||||
alterTableDefaultNextvalCmd =
|
||||
GetAddColumnWithNextvalDefaultCmd(seqOid,
|
||||
relationId,
|
||||
|
@ -2087,15 +2090,9 @@ PostprocessAlterTableStmt(AlterTableStmt *alterTableStatement)
|
|||
Oid seqOid = GetSequenceOid(relationId, attnum);
|
||||
if (seqOid != InvalidOid)
|
||||
{
|
||||
EnsureDistributedSequencesHaveOneType(relationId,
|
||||
list_make1_oid(seqOid),
|
||||
list_make1_int(attnum));
|
||||
|
||||
if (ShouldSyncTableMetadata(relationId))
|
||||
{
|
||||
needMetadataSyncForNewSequences = true;
|
||||
MarkSequenceDistributedAndPropagateWithDependencies(relationId,
|
||||
seqOid);
|
||||
alterTableDefaultNextvalCmd = GetAlterColumnWithNextvalDefaultCmd(
|
||||
seqOid, relationId, command->name);
|
||||
}
|
||||
|
|
|
@ -288,10 +288,6 @@ extern bool GetNodeDiskSpaceStatsForConnection(MultiConnection *connection,
|
|||
extern void ExecuteQueryViaSPI(char *query, int SPIOK);
|
||||
extern void EnsureSequenceTypeSupported(Oid seqOid, Oid seqTypId, Oid ownerRelationId);
|
||||
extern void AlterSequenceType(Oid seqOid, Oid typeOid);
|
||||
extern void MarkSequenceListDistributedAndPropagateWithDependencies(Oid relationId,
|
||||
List *sequenceList);
|
||||
extern void MarkSequenceDistributedAndPropagateWithDependencies(Oid relationId, Oid
|
||||
sequenceOid);
|
||||
extern void EnsureDistributedSequencesHaveOneType(Oid relationId,
|
||||
List *dependentSequenceList,
|
||||
List *attnumList);
|
||||
|
|
Loading…
Reference in New Issue