mirror of https://github.com/citusdata/citus.git
Address reviews
parent
505706d6f1
commit
7b84c72aae
|
@ -304,26 +304,6 @@ CreateCitusLocalTable(Oid relationId, bool cascadeViaForeignKeys, bool autoConve
|
|||
}
|
||||
}
|
||||
|
||||
ObjectAddress tableAddress = { 0 };
|
||||
ObjectAddressSet(tableAddress, RelationRelationId, relationId);
|
||||
|
||||
/*
|
||||
* Ensure that the sequences used in column defaults of the table
|
||||
* have proper types
|
||||
*/
|
||||
List *attnumList = NIL;
|
||||
List *dependentSequenceList = NIL;
|
||||
GetDependentSequencesWithRelation(relationId, &attnumList,
|
||||
&dependentSequenceList, 0);
|
||||
EnsureDistributedSequencesHaveOneType(relationId, dependentSequenceList,
|
||||
attnumList);
|
||||
|
||||
/*
|
||||
* Ensure dependencies first as we will create shell table on the other nodes
|
||||
* in the MX case.
|
||||
*/
|
||||
EnsureDependenciesExistOnAllNodes(&tableAddress);
|
||||
|
||||
/*
|
||||
* Make sure that existing reference tables have been replicated to all
|
||||
* the nodes such that we can create foreign keys and joins work
|
||||
|
@ -366,6 +346,26 @@ CreateCitusLocalTable(Oid relationId, bool cascadeViaForeignKeys, bool autoConve
|
|||
|
||||
InsertMetadataForCitusLocalTable(shellRelationId, shardId, autoConverted);
|
||||
|
||||
ObjectAddress shellTableAddress = { 0 };
|
||||
ObjectAddressSet(shellTableAddress, RelationRelationId, shellRelationId);
|
||||
|
||||
/*
|
||||
* Ensure that the sequences used in column defaults of the table
|
||||
* have proper types
|
||||
*/
|
||||
List *attnumList = NIL;
|
||||
List *dependentSequenceList = NIL;
|
||||
GetDependentSequencesWithRelation(shellRelationId, &attnumList,
|
||||
&dependentSequenceList, 0);
|
||||
EnsureDistributedSequencesHaveOneType(shellRelationId, dependentSequenceList,
|
||||
attnumList);
|
||||
|
||||
/*
|
||||
* Ensure dependencies exist as we will create shell table on the other nodes
|
||||
* in the MX case.
|
||||
*/
|
||||
EnsureDependenciesExistOnAllNodes(&shellTableAddress);
|
||||
|
||||
FinalizeCitusLocalTableCreation(shellRelationId, dependentSequenceList);
|
||||
}
|
||||
|
||||
|
|
|
@ -432,26 +432,6 @@ CreateDistributedTable(Oid relationId, Var *distributionColumn, char distributio
|
|||
DropFKeysRelationInvolvedWithTableType(relationId, INCLUDE_LOCAL_TABLES);
|
||||
}
|
||||
|
||||
/*
|
||||
* Ensure that the sequences used in column defaults of the table
|
||||
* have proper types
|
||||
*/
|
||||
List *attnumList = NIL;
|
||||
List *dependentSequenceList = NIL;
|
||||
GetDependentSequencesWithRelation(relationId, &attnumList, &dependentSequenceList, 0);
|
||||
EnsureDistributedSequencesHaveOneType(relationId, dependentSequenceList,
|
||||
attnumList);
|
||||
|
||||
/*
|
||||
* distributed tables might have dependencies on different objects, since we create
|
||||
* shards for a distributed table via multiple sessions these objects will be created
|
||||
* via their own connection and committed immediately so they become visible to all
|
||||
* sessions creating shards.
|
||||
*/
|
||||
ObjectAddress tableAddress = { 0 };
|
||||
ObjectAddressSet(tableAddress, RelationRelationId, relationId);
|
||||
EnsureDependenciesExistOnAllNodes(&tableAddress);
|
||||
|
||||
char replicationModel = DecideReplicationModel(distributionMethod,
|
||||
colocateWithTableName,
|
||||
viaDeprecatedAPI);
|
||||
|
@ -503,6 +483,26 @@ CreateDistributedTable(Oid relationId, Var *distributionColumn, char distributio
|
|||
InsertIntoPgDistPartition(relationId, distributionMethod, distributionColumn,
|
||||
colocationId, replicationModel, autoConverted);
|
||||
|
||||
/*
|
||||
* Ensure that the sequences used in column defaults of the table
|
||||
* have proper types
|
||||
*/
|
||||
List *attnumList = NIL;
|
||||
List *dependentSequenceList = NIL;
|
||||
GetDependentSequencesWithRelation(relationId, &attnumList, &dependentSequenceList, 0);
|
||||
EnsureDistributedSequencesHaveOneType(relationId, dependentSequenceList,
|
||||
attnumList);
|
||||
|
||||
/*
|
||||
* distributed tables might have dependencies on different objects, since we create
|
||||
* shards for a distributed table via multiple sessions these objects will be created
|
||||
* via their own connection and committed immediately so they become visible to all
|
||||
* sessions creating shards.
|
||||
*/
|
||||
ObjectAddress tableAddress = { 0 };
|
||||
ObjectAddressSet(tableAddress, RelationRelationId, relationId);
|
||||
EnsureDependenciesExistOnAllNodes(&tableAddress);
|
||||
|
||||
/* foreign tables do not support TRUNCATE trigger */
|
||||
if (RegularTable(relationId))
|
||||
{
|
||||
|
@ -540,13 +540,6 @@ CreateDistributedTable(Oid relationId, Var *distributionColumn, char distributio
|
|||
MarkObjectDistributed(&tableAddress);
|
||||
CreateTableMetadataOnWorkers(relationId);
|
||||
}
|
||||
else
|
||||
{
|
||||
/*
|
||||
* Ignore the tables we won't sync their metadata (most importantly append
|
||||
* distributed tables). We won't create shell table for them.
|
||||
*/
|
||||
}
|
||||
|
||||
/*
|
||||
* We've a custom way of foreign key graph invalidation,
|
||||
|
@ -603,14 +596,11 @@ CreateDistributedTable(Oid relationId, Var *distributionColumn, char distributio
|
|||
* If any other distributed table uses the input sequence, it checks whether
|
||||
* the types of the columns using the sequence match. If they don't, it errors out.
|
||||
* Otherwise, the condition is ensured.
|
||||
* Since the owner relation id may not have been distributed yet, we need to check it
|
||||
* in addition all citus tables as well.
|
||||
*/
|
||||
void
|
||||
EnsureSequenceTypeSupported(Oid seqOid, Oid seqTypId, Oid ownerRelationId)
|
||||
EnsureSequenceTypeSupported(Oid seqOid, Oid seqTypId)
|
||||
{
|
||||
List *citusTableIdList = CitusTableTypeIdList(ANY_CITUS_TABLE_TYPE);
|
||||
citusTableIdList = lappend_oid(citusTableIdList, ownerRelationId);
|
||||
|
||||
Oid citusTableId = InvalidOid;
|
||||
foreach_oid(citusTableId, citusTableIdList)
|
||||
|
@ -755,7 +745,7 @@ EnsureDistributedSequencesHaveOneType(Oid relationId, List *dependentSequenceLis
|
|||
* that sequence is supported
|
||||
*/
|
||||
Oid seqTypId = GetAttributeTypeOid(relationId, attnum);
|
||||
EnsureSequenceTypeSupported(sequenceOid, seqTypId, relationId);
|
||||
EnsureSequenceTypeSupported(sequenceOid, seqTypId);
|
||||
|
||||
/*
|
||||
* Alter the sequence's data type in the coordinator if needed.
|
||||
|
|
|
@ -2051,8 +2051,7 @@ PostprocessAlterTableStmt(AlterTableStmt *alterTableStatement)
|
|||
list_make1_int(
|
||||
attnum));
|
||||
|
||||
if (ShouldSyncTableMetadata(relationId) &&
|
||||
ClusterHasKnownMetadataWorkers())
|
||||
if (ShouldSyncTableMetadata(relationId))
|
||||
{
|
||||
needMetadataSyncForNewSequences = true;
|
||||
MarkSequenceDistributedAndPropagateWithDependencies(
|
||||
|
@ -2092,8 +2091,7 @@ PostprocessAlterTableStmt(AlterTableStmt *alterTableStatement)
|
|||
list_make1_oid(seqOid),
|
||||
list_make1_int(attnum));
|
||||
|
||||
if (ShouldSyncTableMetadata(relationId) &&
|
||||
ClusterHasKnownMetadataWorkers())
|
||||
if (ShouldSyncTableMetadata(relationId))
|
||||
{
|
||||
needMetadataSyncForNewSequences = true;
|
||||
MarkSequenceDistributedAndPropagateWithDependencies(relationId,
|
||||
|
@ -2627,8 +2625,7 @@ ErrorIfUnsupportedAlterTableStmt(AlterTableStmt *alterTableStatement)
|
|||
* We currently don't support adding a serial column for an MX table
|
||||
* TODO: record the dependency in the workers
|
||||
*/
|
||||
if (ShouldSyncTableMetadata(relationId) &&
|
||||
ClusterHasKnownMetadataWorkers())
|
||||
if (ShouldSyncTableMetadata(relationId))
|
||||
{
|
||||
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||
errmsg(
|
||||
|
|
|
@ -1857,7 +1857,7 @@ HasMetadataWorkers(void)
|
|||
|
||||
|
||||
/*
|
||||
* CreateShellTableOnWorkers creates shell table on workers.
|
||||
* CreateShellTableOnWorkers creates the shell table on each worker node with metadata.
|
||||
*/
|
||||
void
|
||||
CreateShellTableOnWorkers(Oid relationId)
|
||||
|
|
|
@ -287,7 +287,7 @@ extern bool GetNodeDiskSpaceStatsForConnection(MultiConnection *connection,
|
|||
uint64 *availableBytes,
|
||||
uint64 *totalBytes);
|
||||
extern void ExecuteQueryViaSPI(char *query, int SPIOK);
|
||||
extern void EnsureSequenceTypeSupported(Oid seqOid, Oid seqTypId, Oid ownerRelationId);
|
||||
extern void EnsureSequenceTypeSupported(Oid seqOid, Oid seqTypId);
|
||||
extern void AlterSequenceType(Oid seqOid, Oid typeOid);
|
||||
extern void MarkSequenceListDistributedAndPropagateWithDependencies(Oid relationId,
|
||||
List *sequenceList);
|
||||
|
|
Loading…
Reference in New Issue