From 3636b7c9c530ea83cef3582b8c44fb2202d99649 Mon Sep 17 00:00:00 2001 From: Burak Velioglu Date: Thu, 16 Dec 2021 14:57:46 +0300 Subject: [PATCH] Start handling local tables --- .../citus_add_local_table_to_metadata.c | 33 ++++++++----------- .../commands/create_distributed_table.c | 8 +++-- .../distributed/metadata/metadata_sync.c | 19 +++++------ .../distributed/metadata/node_metadata.c | 29 ++++++++++++++-- 4 files changed, 52 insertions(+), 37 deletions(-) diff --git a/src/backend/distributed/commands/citus_add_local_table_to_metadata.c b/src/backend/distributed/commands/citus_add_local_table_to_metadata.c index a73b06e38..5348ca315 100644 --- a/src/backend/distributed/commands/citus_add_local_table_to_metadata.c +++ b/src/backend/distributed/commands/citus_add_local_table_to_metadata.c @@ -30,6 +30,7 @@ #include "distributed/commands.h" #include "distributed/commands/sequence.h" #include "distributed/commands/utility_hook.h" +#include "distributed/metadata/distobject.h" #include "distributed/foreign_key_relationship.h" #include "distributed/listutils.h" #include "distributed/local_executor.h" @@ -306,6 +307,17 @@ CreateCitusLocalTable(Oid relationId, bool cascadeViaForeignKeys, bool autoConve ObjectAddress tableAddress = { 0 }; ObjectAddressSet(tableAddress, RelationRelationId, relationId); + /* + * Ensure that the sequences used in column defaults of the table + * have proper types + */ + List *attnumList = NIL; + List *dependentSequenceList = NIL; + GetDependentSequencesWithRelation(relationId, &attnumList, + &dependentSequenceList, 0); + EnsureDistributedSequencesHaveOneType(relationId, dependentSequenceList, + attnumList); + /* * Ensure dependencies first as we will create shell table on the other nodes * in the MX case. @@ -333,6 +345,7 @@ CreateCitusLocalTable(Oid relationId, bool cascadeViaForeignKeys, bool autoConve * via process utility. */ ExecuteAndLogUtilityCommandList(shellTableDDLEvents); + MarkObjectDistributed(&tableAddress); /* * Set shellRelationId as the relation with relationId now points @@ -354,17 +367,6 @@ CreateCitusLocalTable(Oid relationId, bool cascadeViaForeignKeys, bool autoConve InsertMetadataForCitusLocalTable(shellRelationId, shardId, autoConverted); - /* - * Ensure that the sequences used in column defaults of the table - * have proper types - */ - List *attnumList = NIL; - List *dependentSequenceList = NIL; - GetDependentSequencesWithRelation(shellRelationId, &attnumList, - &dependentSequenceList, 0); - EnsureDistributedSequencesHaveOneType(shellRelationId, dependentSequenceList, - attnumList); - FinalizeCitusLocalTableCreation(shellRelationId, dependentSequenceList); } @@ -1240,15 +1242,6 @@ FinalizeCitusLocalTableCreation(Oid relationId, List *dependentSequenceList) if (ShouldSyncTableMetadata(relationId)) { - if (ClusterHasKnownMetadataWorkers()) - { - /* - * Ensure sequence dependencies and mark them as distributed - * before creating table metadata on workers - */ - MarkSequenceListDistributedAndPropagateWithDependencies(relationId, - dependentSequenceList); - } CreateTableMetadataOnWorkers(relationId); } diff --git a/src/backend/distributed/commands/create_distributed_table.c b/src/backend/distributed/commands/create_distributed_table.c index 96e4b5866..387d05ab5 100644 --- a/src/backend/distributed/commands/create_distributed_table.c +++ b/src/backend/distributed/commands/create_distributed_table.c @@ -449,11 +449,13 @@ CreateDistributedTable(Oid relationId, Var *distributionColumn, char distributio ObjectAddress tableAddress = { 0 }; ObjectAddressSet(tableAddress, RelationRelationId, relationId); EnsureDependenciesExistOnAllNodes(&tableAddress); - /* TODO: Update owner of the sequence(?) */ /* TODO: Consider partitioned tables */ - CreateShellTableOnWorkers(relationId); - MarkObjectDistributed(&tableAddress); + if (ShouldSyncTableMetadata(relationId)) + { + CreateShellTableOnWorkers(relationId); + MarkObjectDistributed(&tableAddress); + } char replicationModel = DecideReplicationModel(distributionMethod, colocateWithTableName, diff --git a/src/backend/distributed/metadata/metadata_sync.c b/src/backend/distributed/metadata/metadata_sync.c index f6c16dd22..323c419bc 100644 --- a/src/backend/distributed/metadata/metadata_sync.c +++ b/src/backend/distributed/metadata/metadata_sync.c @@ -83,7 +83,7 @@ char *EnableManualMetadataChangesForUser = ""; -static List * GetDistributedTableDDLEvents(Oid relationId); +static List * GetDistributedTableMetadataEvents(Oid relationId); static void EnsureObjectMetadataIsSane(int distributionArgumentIndex, int colocationId); static char * LocalGroupIdUpdateCommand(int32 groupId); @@ -539,13 +539,13 @@ MetadataCreateCommands(void) /* - * GetDistributedTableDDLEvents returns the full set of DDL commands necessary to + * GetDistributedTableMetadataEvents returns the full set of DDL commands necessary to * create the given distributed table on a worker. The list includes setting up any * sequences, setting the owner of the table, inserting table and shard metadata, * setting the truncate trigger and foreign key constraints. */ static List * -GetDistributedTableDDLEvents(Oid relationId) +GetDistributedTableMetadataEvents(Oid relationId) { CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry(relationId); @@ -553,13 +553,6 @@ GetDistributedTableDDLEvents(Oid relationId) /* if the table is owned by an extension we only propagate pg_dist_* records */ bool tableOwnedByExtension = IsTableOwnedByExtension(relationId); - if (!tableOwnedByExtension) - { - /* command to associate sequences with table */ - List *sequenceDependencyCommandList = SequenceDependencyCommandList( - relationId); - commandList = list_concat(commandList, sequenceDependencyCommandList); - } /* command to insert pg_dist_partition entry */ char *metadataCommand = DistributionCreateCommand(cacheEntry); @@ -1788,6 +1781,10 @@ CreateShellTableOnWorkers(Oid relationId) commandList = lappend(commandList, GetTableDDLCommand(tableDDLCommand)); } + /* command to associate sequences with table */ + List *sequenceDependencyCommandList = SequenceDependencyCommandList(relationId); + commandList = list_concat(commandList, sequenceDependencyCommandList); + /* prevent recursive propagation */ SendCommandToWorkersWithMetadata(DISABLE_DDL_PROPAGATION); @@ -1811,7 +1808,7 @@ CreateShellTableOnWorkers(Oid relationId) void CreateTableMetadataOnWorkers(Oid relationId) { - List *commandList = GetDistributedTableDDLEvents(relationId); + List *commandList = GetDistributedTableMetadataEvents(relationId); /* prevent recursive propagation */ SendCommandToWorkersWithMetadata(DISABLE_DDL_PROPAGATION); diff --git a/src/backend/distributed/metadata/node_metadata.c b/src/backend/distributed/metadata/node_metadata.c index 5fd10a98b..b659f144b 100644 --- a/src/backend/distributed/metadata/node_metadata.c +++ b/src/backend/distributed/metadata/node_metadata.c @@ -824,14 +824,37 @@ ClearDistributedObjectsWithMetadataFromNode(WorkerNode *workerNode) clearDistTableInfoCommandList = list_concat(clearDistTableInfoCommandList, detachPartitionCommandList); + /* iterate over the commands and execute them in the same connection */ + const char *commandString = NULL; + foreach_ptr(commandString, clearDistTableInfoCommandList) + { + elog(WARNING, "Current command 1 is %s", commandString); + } + clearDistTableInfoCommandList = lappend(clearDistTableInfoCommandList, REMOVE_ALL_CLUSTERED_TABLES_COMMAND); + commandString = NULL; + foreach_ptr(commandString, clearDistTableInfoCommandList) + { + elog(WARNING, "Current command 2 is %s", commandString); + } clearDistTableInfoCommandList = lappend(clearDistTableInfoCommandList, DELETE_ALL_DISTRIBUTED_OBJECTS); + + commandString = NULL; + foreach_ptr(commandString, clearDistTableInfoCommandList) + { + elog(WARNING, "Current command 3 is %s", commandString); + } - List *clearDistTableCommands = list_make3(DISABLE_DDL_PROPAGATION, - clearDistTableInfoCommandList, - ENABLE_DDL_PROPAGATION); + clearDistTableInfoCommandList = list_concat(list_make1(DISABLE_DDL_PROPAGATION),clearDistTableInfoCommandList); + clearDistTableInfoCommandList = list_concat(clearDistTableInfoCommandList, list_make1(ENABLE_DDL_PROPAGATION)); + + commandString = NULL; + foreach_ptr(commandString, clearDistTableInfoCommandList) + { + elog(WARNING, "Current command 3 is %s", commandString); + } char *currentUser = CurrentUserName(); SendMetadataCommandListToWorkerInCoordinatedTransaction(workerNode->workerName,