Move table related part out from metadata

velioglu/wo_seq_test_1
Burak Velioglu 2022-01-19 13:24:33 +03:00
parent b082473563
commit 065db645b9
No known key found for this signature in database
GPG Key ID: F6827E620F6549C6
5 changed files with 60 additions and 38 deletions

View File

@ -1242,11 +1242,12 @@ FinalizeCitusLocalTableCreation(Oid relationId, List *dependentSequenceList)
if (ShouldSyncTableMetadata(relationId)) if (ShouldSyncTableMetadata(relationId))
{ {
CreateShellTableOnWorkers(relationId); CreateShellTableOnWorkers(relationId);
CreateTableMetadataOnWorkers(relationId);
ObjectAddress relationAddress = { 0 }; ObjectAddress relationAddress = { 0 };
ObjectAddressSet(relationAddress, RelationRelationId, relationId); ObjectAddressSet(relationAddress, RelationRelationId, relationId);
MarkObjectDistributed(&relationAddress); MarkObjectDistributed(&relationAddress);
CreateTableMetadataOnWorkers(relationId);
CreateInterTableRelationshipOfRelationOnWorkers(relationId);
} }
/* /*

View File

@ -539,6 +539,7 @@ CreateDistributedTable(Oid relationId, Var *distributionColumn, char distributio
CreateShellTableOnWorkers(relationId); CreateShellTableOnWorkers(relationId);
MarkObjectDistributed(&tableAddress); MarkObjectDistributed(&tableAddress);
CreateTableMetadataOnWorkers(relationId); CreateTableMetadataOnWorkers(relationId);
CreateInterTableRelationshipOfRelationOnWorkers(relationId);
} }
/* /*
@ -601,7 +602,6 @@ void
EnsureSequenceTypeSupported(Oid seqOid, Oid seqTypId) EnsureSequenceTypeSupported(Oid seqOid, Oid seqTypId)
{ {
List *citusTableIdList = CitusTableTypeIdList(ANY_CITUS_TABLE_TYPE); List *citusTableIdList = CitusTableTypeIdList(ANY_CITUS_TABLE_TYPE);
Oid citusTableId = InvalidOid; Oid citusTableId = InvalidOid;
foreach_oid(citusTableId, citusTableIdList) foreach_oid(citusTableId, citusTableIdList)
{ {

View File

@ -374,7 +374,8 @@ GetCitusTableDDLCommandList(Oid relationId)
List * List *
ReplicateAllDependenciesToNodeCommandList(const char *nodeName, int nodePort) ReplicateAllDependenciesToNodeCommandList(const char *nodeName, int nodePort)
{ {
List *ddlCommands = NIL; /* since we are executing ddl commands disable propagation first, primarily for mx */
List *ddlCommands = list_make1(DISABLE_DDL_PROPAGATION);
/* /*
* collect all dependencies in creation order and get their ddl commands * collect all dependencies in creation order and get their ddl commands
@ -412,8 +413,6 @@ ReplicateAllDependenciesToNodeCommandList(const char *nodeName, int nodePort)
GetDependencyCreateDDLCommands(dependency)); GetDependencyCreateDDLCommands(dependency));
} }
/* since we are executing ddl commands lets disable propagation, primarily for mx */
ddlCommands = lcons(DISABLE_DDL_PROPAGATION, ddlCommands);
ddlCommands = lappend(ddlCommands, ENABLE_DDL_PROPAGATION); ddlCommands = lappend(ddlCommands, ENABLE_DDL_PROPAGATION);
return ddlCommands; return ddlCommands;

View File

@ -637,8 +637,7 @@ DistributedObjectMetadataSyncCommandList(void)
/* /*
* GetDistributedTableMetadataEvents returns the full set of DDL commands necessary to * GetDistributedTableMetadataEvents returns the full set of DDL commands necessary to
* create the given distributed table metadata on a worker. The list includes setting up * create the given distributed table metadata on a worker.
* any shard metadata, setting the truncate trigger and foreign key constraints.
*/ */
static List * static List *
GetDistributedTableMetadataEvents(Oid relationId) GetDistributedTableMetadataEvents(Oid relationId)
@ -647,41 +646,15 @@ GetDistributedTableMetadataEvents(Oid relationId)
List *commandList = NIL; List *commandList = NIL;
/* if the table is owned by an extension we only propagate pg_dist_* records */
bool tableOwnedByExtension = IsTableOwnedByExtension(relationId);
/* command to insert pg_dist_partition entry */ /* command to insert pg_dist_partition entry */
char *metadataCommand = DistributionCreateCommand(cacheEntry); char *metadataCommand = DistributionCreateCommand(cacheEntry);
commandList = lappend(commandList, metadataCommand); commandList = lappend(commandList, metadataCommand);
/* commands to create the truncate trigger of the table */
if (!IsForeignTable(relationId))
{
char *truncateTriggerCreateCommand = TruncateTriggerCreateCommand(relationId);
commandList = lappend(commandList, truncateTriggerCreateCommand);
}
/* commands to insert pg_dist_shard & pg_dist_placement entries */ /* commands to insert pg_dist_shard & pg_dist_placement entries */
List *shardIntervalList = LoadShardIntervalList(relationId); List *shardIntervalList = LoadShardIntervalList(relationId);
List *shardMetadataInsertCommandList = ShardListInsertCommand(shardIntervalList); List *shardMetadataInsertCommandList = ShardListInsertCommand(shardIntervalList);
commandList = list_concat(commandList, shardMetadataInsertCommandList); commandList = list_concat(commandList, shardMetadataInsertCommandList);
if (!tableOwnedByExtension)
{
/* commands to create foreign key constraints */
List *foreignConstraintCommands =
GetReferencingForeignConstaintCommands(relationId);
commandList = list_concat(commandList, foreignConstraintCommands);
/* commands to create partitioning hierarchy */
if (PartitionTable(relationId))
{
char *alterTableAttachPartitionCommands =
GenerateAlterTableAttachPartitionCommand(relationId);
commandList = lappend(commandList, alterTableAttachPartitionCommands);
}
}
return commandList; return commandList;
} }
@ -1857,7 +1830,48 @@ HasMetadataWorkers(void)
/* /*
* CreateShellTableOnWorkers creates the shell table on each worker node with metadata. * CreateInterTableRelationshipOfRelationOnWorkers create inter table relationship
* for the the given relation id.
*/
void
CreateInterTableRelationshipOfRelationOnWorkers(Oid relationId)
{
/* if the table is owned by an extension we don't create */
bool tableOwnedByExtension = IsTableOwnedByExtension(relationId);
if (tableOwnedByExtension)
{
return;
}
List *commandList = NIL;
/* commands to create foreign key constraints */
List *foreignConstraintCommands =
GetReferencingForeignConstaintCommands(relationId);
commandList = list_concat(commandList, foreignConstraintCommands);
/* commands to create partitioning hierarchy */
if (PartitionTable(relationId))
{
char *alterTableAttachPartitionCommands =
GenerateAlterTableAttachPartitionCommand(relationId);
commandList = lappend(commandList, alterTableAttachPartitionCommands);
}
/* prevent recursive propagation */
SendCommandToWorkersWithMetadata(DISABLE_DDL_PROPAGATION);
const char *command = NULL;
foreach_ptr(command, commandList)
{
SendCommandToWorkersWithMetadata(command);
}
}
/*
* CreateShellTableOnWorkers creates the shell table on each worker node with metadata
* including sequence dependency and truncate triggers.
*/ */
void void
CreateShellTableOnWorkers(Oid relationId) CreateShellTableOnWorkers(Oid relationId)
@ -1885,6 +1899,13 @@ CreateShellTableOnWorkers(Oid relationId)
List *sequenceDependencyCommandList = SequenceDependencyCommandList(relationId); List *sequenceDependencyCommandList = SequenceDependencyCommandList(relationId);
commandList = list_concat(commandList, sequenceDependencyCommandList); commandList = list_concat(commandList, sequenceDependencyCommandList);
/* commands to create the truncate trigger of the table */
if (!IsForeignTable(relationId))
{
char *truncateTriggerCreateCommand = TruncateTriggerCreateCommand(relationId);
commandList = lappend(commandList, truncateTriggerCreateCommand);
}
/* prevent recursive propagation */ /* prevent recursive propagation */
SendCommandToWorkersWithMetadata(DISABLE_DDL_PROPAGATION); SendCommandToWorkersWithMetadata(DISABLE_DDL_PROPAGATION);
@ -1898,9 +1919,9 @@ CreateShellTableOnWorkers(Oid relationId)
/* /*
* CreateTableMetadataOnWorkers creates the list of commands needed to create the * CreateTableMetadataOnWorkers creates the list of commands needed to create the
* given distributed table and sends these commands to all metadata workers i.e. workers * metadata of the given distributed table and sends these commands to all metadata
* with hasmetadata=true. Before sending the commands, in order to prevent recursive * workers i.e. workers with hasmetadata=true. Before sending the commands, in order
* propagation, DDL propagation on workers are disabled with a * to prevent recursive propagation, DDL propagation on workers are disabled with a
* `SET citus.enable_ddl_propagation TO off;` command. * `SET citus.enable_ddl_propagation TO off;` command.
*/ */
void void

View File

@ -54,6 +54,7 @@ extern List * GrantOnSchemaDDLCommands(Oid schemaId);
extern char * PlacementUpsertCommand(uint64 shardId, uint64 placementId, int shardState, extern char * PlacementUpsertCommand(uint64 shardId, uint64 placementId, int shardState,
uint64 shardLength, int32 groupId); uint64 shardLength, int32 groupId);
extern char * TruncateTriggerCreateCommand(Oid relationId); extern char * TruncateTriggerCreateCommand(Oid relationId);
extern void CreateInterTableRelationshipOfRelationOnWorkers(Oid relationId);
extern void CreateShellTableOnWorkers(Oid relationId); extern void CreateShellTableOnWorkers(Oid relationId);
extern void CreateTableMetadataOnWorkers(Oid relationId); extern void CreateTableMetadataOnWorkers(Oid relationId);
extern List * DetachPartitionCommandList(void); extern List * DetachPartitionCommandList(void);