mirror of https://github.com/citusdata/citus.git
Address reviews
parent
065db645b9
commit
97a1720a17
|
@ -1241,13 +1241,7 @@ FinalizeCitusLocalTableCreation(Oid relationId, List *dependentSequenceList)
|
||||||
|
|
||||||
if (ShouldSyncTableMetadata(relationId))
|
if (ShouldSyncTableMetadata(relationId))
|
||||||
{
|
{
|
||||||
CreateShellTableOnWorkers(relationId);
|
SyncCitusTableMetadata(relationId);
|
||||||
ObjectAddress relationAddress = { 0 };
|
|
||||||
ObjectAddressSet(relationAddress, RelationRelationId, relationId);
|
|
||||||
MarkObjectDistributed(&relationAddress);
|
|
||||||
|
|
||||||
CreateTableMetadataOnWorkers(relationId);
|
|
||||||
CreateInterTableRelationshipOfRelationOnWorkers(relationId);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
|
|
@ -536,10 +536,7 @@ CreateDistributedTable(Oid relationId, Var *distributionColumn, char distributio
|
||||||
|
|
||||||
if (ShouldSyncTableMetadata(relationId))
|
if (ShouldSyncTableMetadata(relationId))
|
||||||
{
|
{
|
||||||
CreateShellTableOnWorkers(relationId);
|
SyncCitusTableMetadata(relationId);
|
||||||
MarkObjectDistributed(&tableAddress);
|
|
||||||
CreateTableMetadataOnWorkers(relationId);
|
|
||||||
CreateInterTableRelationshipOfRelationOnWorkers(relationId);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
|
|
@ -239,6 +239,21 @@ SyncNodeMetadataToNode(const char *nodeNameString, int32 nodePort)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* SyncCitusTableMetadata syncs citus table metadata to worker nodes with metadata.
|
||||||
|
* Our definition of metadata includes the shell table and its inter relations with
|
||||||
|
* other shell tables, corresponding pg_dist_object, pg_dist_partiton, pg_dist_shard
|
||||||
|
* and pg_dist_shard placement entries.
|
||||||
|
*/
|
||||||
|
void
|
||||||
|
SyncCitusTableMetadata(Oid relationId)
|
||||||
|
{
|
||||||
|
CreateShellTableOnWorkers(relationId);
|
||||||
|
CreateTableMetadataOnWorkers(relationId);
|
||||||
|
CreateInterTableRelationshipOfRelationOnWorkers(relationId);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* EnsureSequentialModeMetadataOperations makes sure that the current transaction is
|
* EnsureSequentialModeMetadataOperations makes sure that the current transaction is
|
||||||
* already in sequential mode, or can still safely be put in sequential mode,
|
* already in sequential mode, or can still safely be put in sequential mode,
|
||||||
|
@ -650,6 +665,17 @@ GetDistributedTableMetadataEvents(Oid relationId)
|
||||||
char *metadataCommand = DistributionCreateCommand(cacheEntry);
|
char *metadataCommand = DistributionCreateCommand(cacheEntry);
|
||||||
commandList = lappend(commandList, metadataCommand);
|
commandList = lappend(commandList, metadataCommand);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Commands to create the truncate trigger of the table. We are creating
|
||||||
|
* that as a part of metadata since truncate trigger handles the metadata
|
||||||
|
* while dropping the table.
|
||||||
|
*/
|
||||||
|
if (!IsForeignTable(relationId))
|
||||||
|
{
|
||||||
|
char *truncateTriggerCreateCommand = TruncateTriggerCreateCommand(relationId);
|
||||||
|
commandList = lappend(commandList, truncateTriggerCreateCommand);
|
||||||
|
}
|
||||||
|
|
||||||
/* commands to insert pg_dist_shard & pg_dist_placement entries */
|
/* commands to insert pg_dist_shard & pg_dist_placement entries */
|
||||||
List *shardIntervalList = LoadShardIntervalList(relationId);
|
List *shardIntervalList = LoadShardIntervalList(relationId);
|
||||||
List *shardMetadataInsertCommandList = ShardListInsertCommand(shardIntervalList);
|
List *shardMetadataInsertCommandList = ShardListInsertCommand(shardIntervalList);
|
||||||
|
@ -1899,13 +1925,6 @@ CreateShellTableOnWorkers(Oid relationId)
|
||||||
List *sequenceDependencyCommandList = SequenceDependencyCommandList(relationId);
|
List *sequenceDependencyCommandList = SequenceDependencyCommandList(relationId);
|
||||||
commandList = list_concat(commandList, sequenceDependencyCommandList);
|
commandList = list_concat(commandList, sequenceDependencyCommandList);
|
||||||
|
|
||||||
/* commands to create the truncate trigger of the table */
|
|
||||||
if (!IsForeignTable(relationId))
|
|
||||||
{
|
|
||||||
char *truncateTriggerCreateCommand = TruncateTriggerCreateCommand(relationId);
|
|
||||||
commandList = lappend(commandList, truncateTriggerCreateCommand);
|
|
||||||
}
|
|
||||||
|
|
||||||
/* prevent recursive propagation */
|
/* prevent recursive propagation */
|
||||||
SendCommandToWorkersWithMetadata(DISABLE_DDL_PROPAGATION);
|
SendCommandToWorkersWithMetadata(DISABLE_DDL_PROPAGATION);
|
||||||
|
|
||||||
|
@ -1914,6 +1933,14 @@ CreateShellTableOnWorkers(Oid relationId)
|
||||||
{
|
{
|
||||||
SendCommandToWorkersWithMetadata(command);
|
SendCommandToWorkersWithMetadata(command);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Mark the table object as distributed at the end as we need to propagate
|
||||||
|
* that table to new nodes anyway.
|
||||||
|
*/
|
||||||
|
ObjectAddress relationAddress = { 0 };
|
||||||
|
ObjectAddressSet(relationAddress, RelationRelationId, relationId);
|
||||||
|
MarkObjectDistributed(&relationAddress);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -29,6 +29,7 @@ typedef enum
|
||||||
|
|
||||||
/* Functions declarations for metadata syncing */
|
/* Functions declarations for metadata syncing */
|
||||||
extern void SyncNodeMetadataToNode(const char *nodeNameString, int32 nodePort);
|
extern void SyncNodeMetadataToNode(const char *nodeNameString, int32 nodePort);
|
||||||
|
extern void SyncCitusTableMetadata(Oid relationId);
|
||||||
extern bool ClusterHasKnownMetadataWorkers(void);
|
extern bool ClusterHasKnownMetadataWorkers(void);
|
||||||
extern char * LocalGroupIdUpdateCommand(int32 groupId);
|
extern char * LocalGroupIdUpdateCommand(int32 groupId);
|
||||||
extern bool ShouldSyncTableMetadata(Oid relationId);
|
extern bool ShouldSyncTableMetadata(Oid relationId);
|
||||||
|
|
Loading…
Reference in New Issue