diff --git a/src/backend/distributed/commands/create_distributed_table.c b/src/backend/distributed/commands/create_distributed_table.c index 9a200a059..b830fb500 100644 --- a/src/backend/distributed/commands/create_distributed_table.c +++ b/src/backend/distributed/commands/create_distributed_table.c @@ -440,11 +440,19 @@ CreateDistributedTable(Oid relationId, Var *distributionColumn, char distributio ObjectAddressSet(tableAddress, RelationRelationId, relationId); EnsureDependenciesExistOnAllNodes(&tableAddress); + /* + * For now assume that we can create table after ensuring that dependencies exist. + * Obviously it doesn't support sequences we don't care for it now. + * + * TODO: Consider partitioned tables + */ + CreateShellTableOnWorkers(relationId); + MarkObjectDistributed(&tableAddress); + char replicationModel = DecideReplicationModel(distributionMethod, colocateWithTableName, viaDeprecatedAPI); - /* * Due to dropping columns, the parent's distribution key may not match the * partition's distribution key. The input distributionColumn belongs to diff --git a/src/backend/distributed/commands/dependencies.c b/src/backend/distributed/commands/dependencies.c index ffb1222e6..0328c6f28 100644 --- a/src/backend/distributed/commands/dependencies.c +++ b/src/backend/distributed/commands/dependencies.c @@ -232,8 +232,18 @@ GetDependencyCreateDDLCommands(const ObjectAddress *dependency) return NIL; } - /* if this relation is not supported, break to the error at the end */ - break; + List *commandList = NIL; + List *tableDDLCommands = GetFullTableCreationCommands(dependency->objectId, + WORKER_NEXTVAL_SEQUENCE_DEFAULTS); + + TableDDLCommand *tableDDLCommand = NULL; + foreach_ptr(tableDDLCommand, tableDDLCommands) + { + Assert(CitusIsA(tableDDLCommand, TableDDLCommand)); + commandList = lappend(commandList, GetTableDDLCommand(tableDDLCommand)); + } + + return commandList; } case OCLASS_COLLATION: diff --git a/src/backend/distributed/metadata/dependency.c b/src/backend/distributed/metadata/dependency.c index 975fd7d5b..fef79acde 100644 --- a/src/backend/distributed/metadata/dependency.c +++ b/src/backend/distributed/metadata/dependency.c @@ -665,16 +665,20 @@ SupportedDependencyByCitus(const ObjectAddress *address) case OCLASS_CLASS: { + char relKind = get_rel_relkind(address->objectId); + /* * composite types have a reference to a relation of composite type, we need * to follow those to get the dependencies of type fields. + * + * As we also handle tables as objects as well, follow dependencies + * for tables. */ - if (get_rel_relkind(address->objectId) == RELKIND_COMPOSITE_TYPE) + if (relKind == RELKIND_COMPOSITE_TYPE || + relKind == RELKIND_RELATION) { return true; } - - return false; } default: diff --git a/src/backend/distributed/metadata/metadata_sync.c b/src/backend/distributed/metadata/metadata_sync.c index ecd01a2ab..2992aa31b 100644 --- a/src/backend/distributed/metadata/metadata_sync.c +++ b/src/backend/distributed/metadata/metadata_sync.c @@ -89,7 +89,6 @@ static List * GetDistributedTableDDLEvents(Oid relationId); static void EnsureObjectMetadataIsSane(int distributionArgumentIndex, int colocationId); static char * LocalGroupIdUpdateCommand(int32 groupId); -static List * SequenceDependencyCommandList(Oid relationId); static char * TruncateTriggerCreateCommand(Oid relationId); static char * SchemaOwnerName(Oid objectId); static bool HasMetadataWorkers(void); @@ -106,7 +105,7 @@ static GrantStmt * GenerateGrantOnSchemaStmtForRights(Oid roleOid, Oid schemaOid, char *permission, bool withGrantOption); -static void SetLocalEnableDependencyCreation(bool state); + static char * GenerateSetRoleQuery(Oid roleOid); static void MetadataSyncSigTermHandler(SIGNAL_ARGS); static void MetadataSyncSigAlrmHandler(SIGNAL_ARGS); @@ -523,8 +522,7 @@ DropMetadataSnapshotOnNode(WorkerNode *workerNode) * following queries: * * (i) Query that populates pg_dist_node table - * (ii) Queries that create the clustered tables (including foreign keys, - * partitioning hierarchy etc.) + * (ii) Queries that create the foreign keys and partitioning hierarchy * (iii) Queries that populate pg_dist_partition table referenced by (ii) * (iv) Queries that populate pg_dist_shard table referenced by (iii) * (v) Queries that populate pg_dist_placement table referenced by (iv) @@ -538,7 +536,6 @@ MetadataCreateCommands(void) List *propagatedTableList = NIL; bool includeNodesFromOtherClusters = true; List *workerNodeList = ReadDistNode(includeNodesFromOtherClusters); - IncludeSequenceDefaults includeSequenceDefaults = WORKER_NEXTVAL_SEQUENCE_DEFAULTS; /* make sure we have deterministic output for our tests */ workerNodeList = SortList(workerNodeList, CompareWorkerNodes); @@ -558,125 +555,7 @@ MetadataCreateCommands(void) } } - /* create the tables, but not the metadata */ - foreach_ptr(cacheEntry, propagatedTableList) - { - Oid relationId = cacheEntry->relationId; - ObjectAddress tableAddress = { 0 }; - - if (IsTableOwnedByExtension(relationId)) - { - /* skip table creation when the Citus table is owned by an extension */ - continue; - } - - List *ddlCommandList = GetFullTableCreationCommands(relationId, - includeSequenceDefaults); - char *tableOwnerResetCommand = TableOwnerResetCommand(relationId); - - /* - * Tables might have dependencies on different objects, since we create shards for - * table via multiple sessions these objects will be created via their own connection - * and committed immediately so they become visible to all sessions creating shards. - */ - ObjectAddressSet(tableAddress, RelationRelationId, relationId); - - /* - * Set object propagation to off as we will mark objects distributed - * at the end of this function. - */ - bool prevDependencyCreationValue = EnableDependencyCreation; - SetLocalEnableDependencyCreation(false); - - EnsureDependenciesExistOnAllNodes(&tableAddress); - - /* - * Ensure sequence dependencies and mark them as distributed - */ - List *attnumList = NIL; - List *dependentSequenceList = NIL; - GetDependentSequencesWithRelation(relationId, &attnumList, - &dependentSequenceList, 0); - - Oid sequenceOid = InvalidOid; - foreach_oid(sequenceOid, dependentSequenceList) - { - ObjectAddress sequenceAddress = { 0 }; - ObjectAddressSet(sequenceAddress, RelationRelationId, sequenceOid); - EnsureDependenciesExistOnAllNodes(&sequenceAddress); - - /* - * Sequences are not marked as distributed while creating table - * if no metadata worker node exists. We are marking all sequences - * distributed while syncing metadata in such case. - */ - MarkObjectDistributed(&sequenceAddress); - } - - SetLocalEnableDependencyCreation(prevDependencyCreationValue); - - List *workerSequenceDDLCommands = SequenceDDLCommandsForTable(relationId); - metadataSnapshotCommandList = list_concat(metadataSnapshotCommandList, - workerSequenceDDLCommands); - - /* ddlCommandList contains TableDDLCommand information, need to materialize */ - TableDDLCommand *tableDDLCommand = NULL; - foreach_ptr(tableDDLCommand, ddlCommandList) - { - Assert(CitusIsA(tableDDLCommand, TableDDLCommand)); - metadataSnapshotCommandList = lappend(metadataSnapshotCommandList, - GetTableDDLCommand(tableDDLCommand)); - } - - metadataSnapshotCommandList = lappend(metadataSnapshotCommandList, - tableOwnerResetCommand); - - List *sequenceDependencyCommandList = SequenceDependencyCommandList( - relationId); - metadataSnapshotCommandList = list_concat(metadataSnapshotCommandList, - sequenceDependencyCommandList); - } - - /* construct the foreign key constraints after all tables are created */ - foreach_ptr(cacheEntry, propagatedTableList) - { - Oid relationId = cacheEntry->relationId; - - if (IsTableOwnedByExtension(relationId)) - { - /* skip foreign key creation when the Citus table is owned by an extension */ - continue; - } - - List *foreignConstraintCommands = - GetReferencingForeignConstaintCommands(relationId); - - metadataSnapshotCommandList = list_concat(metadataSnapshotCommandList, - foreignConstraintCommands); - } - - /* construct partitioning hierarchy after all tables are created */ - foreach_ptr(cacheEntry, propagatedTableList) - { - Oid relationId = cacheEntry->relationId; - - if (IsTableOwnedByExtension(relationId)) - { - /* skip partition creation when the Citus table is owned by an extension */ - continue; - } - - if (PartitionTable(relationId)) - { - char *alterTableAttachPartitionCommands = - GenerateAlterTableAttachPartitionCommand(relationId); - - metadataSnapshotCommandList = lappend(metadataSnapshotCommandList, - alterTableAttachPartitionCommands); - } - } - - /* after all tables are created, create the metadata */ + /* create the metadata */ foreach_ptr(cacheEntry, propagatedTableList) { Oid clusteredTableId = cacheEntry->relationId; @@ -814,25 +693,11 @@ GetDistributedTableDDLEvents(Oid relationId) CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry(relationId); List *commandList = NIL; - IncludeSequenceDefaults includeSequenceDefaults = WORKER_NEXTVAL_SEQUENCE_DEFAULTS; /* if the table is owned by an extension we only propagate pg_dist_* records */ bool tableOwnedByExtension = IsTableOwnedByExtension(relationId); if (!tableOwnedByExtension) { - /* - * Commands to create the table, these commands are TableDDLCommands so lets - * materialize to the non-sharded version - */ - List *tableDDLCommands = GetFullTableCreationCommands(relationId, - includeSequenceDefaults); - TableDDLCommand *tableDDLCommand = NULL; - foreach_ptr(tableDDLCommand, tableDDLCommands) - { - Assert(CitusIsA(tableDDLCommand, TableDDLCommand)); - commandList = lappend(commandList, GetTableDDLCommand(tableDDLCommand)); - } - /* command to associate sequences with table */ List *sequenceDependencyCommandList = SequenceDependencyCommandList( relationId); @@ -1737,7 +1602,7 @@ GetSequencesFromAttrDef(Oid attrdefOid) * necessary to ensure that the sequence is dropped when the table is * dropped. */ -static List * +List * SequenceDependencyCommandList(Oid relationId) { List *sequenceCommandList = NIL; @@ -1975,7 +1840,7 @@ GenerateGrantOnSchemaStmtForRights(Oid roleOid, /* * SetLocalEnableDependencyCreation sets the enable_object_propagation locally */ -static void +void SetLocalEnableDependencyCreation(bool state) { set_config_option("citus.enable_object_propagation", state == true ? "on" : "off", @@ -2061,6 +1926,43 @@ HasMetadataWorkers(void) } +/* + * CreateShellTableOnWorkers creates shell table on workers. + */ +void +CreateShellTableOnWorkers(Oid relationId) +{ + /* if the table is owned by an extension we don't create */ + bool tableOwnedByExtension = IsTableOwnedByExtension(relationId); + + if (!tableOwnedByExtension) + { + List *commandList = NIL; + IncludeSequenceDefaults includeSequenceDefaults = + WORKER_NEXTVAL_SEQUENCE_DEFAULTS; + + List *tableDDLCommands = GetFullTableCreationCommands(relationId, + includeSequenceDefaults); + TableDDLCommand *tableDDLCommand = NULL; + foreach_ptr(tableDDLCommand, tableDDLCommands) + { + Assert(CitusIsA(tableDDLCommand, TableDDLCommand)); + commandList = lappend(commandList, GetTableDDLCommand(tableDDLCommand)); + } + + /* prevent recursive propagation */ + SendCommandToWorkersWithMetadata(DISABLE_DDL_PROPAGATION); + + /* send the commands one by one */ + const char *command = NULL; + foreach_ptr(command, commandList) + { + SendCommandToWorkersWithMetadata(command); + } + } +} + + /* * CreateTableMetadataOnWorkers creates the list of commands needed to create the * given distributed table and sends these commands to all metadata workers i.e. workers diff --git a/src/backend/distributed/metadata/node_metadata.c b/src/backend/distributed/metadata/node_metadata.c index 6c8c94137..47b9125bb 100644 --- a/src/backend/distributed/metadata/node_metadata.c +++ b/src/backend/distributed/metadata/node_metadata.c @@ -40,6 +40,7 @@ #include "distributed/remote_commands.h" #include "distributed/resource_lock.h" #include "distributed/shardinterval_utils.h" +#include "distributed/multi_partitioning_utils.h" #include "distributed/shared_connection_stats.h" #include "distributed/string_utils.h" #include "distributed/transaction_recovery.h" @@ -105,7 +106,9 @@ static void InsertPlaceholderCoordinatorRecord(void); static void InsertNodeRow(int nodeid, char *nodename, int32 nodeport, NodeMetadata *nodeMetadata); static void DeleteNodeRow(char *nodename, int32 nodeport); -static void SetUpDistributedTableDependencies(WorkerNode *workerNode); +static void SetUpSequences(WorkerNode *workerNode); +static void SetUpDistributedTableWithDependencies(WorkerNode *workerNode); +static void SetUpMultipleDistributedTableIntegrations(WorkerNode *workerNode); static WorkerNode * TupleToWorkerNode(TupleDesc tupleDescriptor, HeapTuple heapTuple); static void PropagateNodeWideObjects(WorkerNode *newWorkerNode); static WorkerNode * ModifiableWorkerNode(const char *nodeName, int32 nodePort); @@ -576,10 +579,159 @@ master_set_node_property(PG_FUNCTION_ARGS) } +static void +SetUpMultipleDistributedTableIntegrations(WorkerNode *workerNode) +{ + List *distributedTableList = CitusTableList(); + List *propagatedTableList = NIL; + List *multipleTableIntegrationCommandList = NIL; + + CitusTableCacheEntry *cacheEntry = NULL; + foreach_ptr(cacheEntry, distributedTableList) + { + if (ShouldSyncTableMetadata(cacheEntry->relationId)) + { + propagatedTableList = lappend(propagatedTableList, cacheEntry); + } + } + + /* construct the foreign key constraints after all tables are created */ + foreach_ptr(cacheEntry, propagatedTableList) + { + Oid relationId = cacheEntry->relationId; + + if (IsTableOwnedByExtension(relationId)) + { + /* skip foreign key creation when the Citus table is owned by an extension */ + continue; + } + + List *foreignConstraintCommands = + GetReferencingForeignConstaintCommands(relationId); + + multipleTableIntegrationCommandList = list_concat( + multipleTableIntegrationCommandList, + foreignConstraintCommands); + } + + /* construct partitioning hierarchy after all tables are created */ + foreach_ptr(cacheEntry, propagatedTableList) + { + Oid relationId = cacheEntry->relationId; + + if (IsTableOwnedByExtension(relationId)) + { + /* skip partition creation when the Citus table is owned by an extension */ + continue; + } + + if (PartitionTable(relationId)) + { + char *alterTableAttachPartitionCommands = + GenerateAlterTableAttachPartitionCommand(relationId); + + multipleTableIntegrationCommandList = lappend( + multipleTableIntegrationCommandList, + alterTableAttachPartitionCommands); + } + } + + /* prevent recursive propagation */ + SendCommandToWorkersWithMetadata(DISABLE_DDL_PROPAGATION); + + /* send the commands one by one */ + const char *command = NULL; + foreach_ptr(command, multipleTableIntegrationCommandList) + { + SendCommandToWorkersWithMetadata(command); + } +} + + +static void +SetUpSequences(WorkerNode *workerNode) +{ + List *distributedTableList = CitusTableList(); + List *propagatedTableList = NIL; + List *sequenceCommandList = NIL; + + CitusTableCacheEntry *cacheEntry = NULL; + foreach_ptr(cacheEntry, distributedTableList) + { + if (ShouldSyncTableMetadata(cacheEntry->relationId)) + { + propagatedTableList = lappend(propagatedTableList, cacheEntry); + } + } + + /* create the metadata */ + foreach_ptr(cacheEntry, propagatedTableList) + { + Oid relationId = cacheEntry->relationId; + + if (IsTableOwnedByExtension(relationId)) + { + /* skip table metadata creation when the Citus table is owned by an extension */ + continue; + } + + /* + * Set object propagation to off as objects will be distributed while syncing + * the metadata. + */ + bool prevDependencyCreationValue = EnableDependencyCreation; + SetLocalEnableDependencyCreation(false); + + /* + * Ensure sequence dependencies and mark them as distributed + */ + List *attnumList = NIL; + List *dependentSequenceList = NIL; + GetDependentSequencesWithRelation(relationId, &attnumList, + &dependentSequenceList, 0); + + Oid sequenceOid = InvalidOid; + foreach_oid(sequenceOid, dependentSequenceList) + { + ObjectAddress sequenceAddress = { 0 }; + ObjectAddressSet(sequenceAddress, RelationRelationId, sequenceOid); + EnsureDependenciesExistOnAllNodes(&sequenceAddress); + + /* + * Sequences are not marked as distributed while creating table + * if no metadata worker node exists. We are marking all sequences + * distributed while syncing metadata in such case. + */ + MarkObjectDistributed(&sequenceAddress); + } + + SetLocalEnableDependencyCreation(prevDependencyCreationValue); + + List *workerSequenceDDLCommands = SequenceDDLCommandsForTable(relationId); + sequenceCommandList = list_concat(sequenceCommandList, workerSequenceDDLCommands); + + List *sequenceDependencyCommandList = SequenceDependencyCommandList(relationId); + sequenceCommandList = list_concat(sequenceCommandList, + sequenceDependencyCommandList); + + /* prevent recursive propagation */ + SendCommandToWorkersWithMetadata(DISABLE_DDL_PROPAGATION); + + /* send the commands one by one */ + const char *command = NULL; + foreach_ptr(command, sequenceCommandList) + { + SendCommandToWorkersWithMetadata(command); + } + } +} + + /* - * SetUpDistributedTableDependencies sets up up the following on a node if it's + * SetUpDistributedTableWithDependencies sets up up the following on a node if it's * a primary node that currently stores data: * - All dependencies (e.g., types, schemas) + * - All shell distributed table * - Reference tables, because they are needed to handle queries efficiently. * - Distributed functions * @@ -587,7 +739,7 @@ master_set_node_property(PG_FUNCTION_ARGS) * since all the dependencies should be present in the coordinator already. */ static void -SetUpDistributedTableDependencies(WorkerNode *newWorkerNode) +SetUpDistributedTableWithDependencies(WorkerNode *newWorkerNode) { if (NodeIsPrimary(newWorkerNode)) { @@ -896,7 +1048,9 @@ ActivateNode(char *nodeName, int nodePort) BoolGetDatum(isActive)); } - SetUpDistributedTableDependencies(workerNode); + SetUpSequences(workerNode); + SetUpDistributedTableWithDependencies(workerNode); + SetUpMultipleDistributedTableIntegrations(workerNode); if (syncMetadata) { diff --git a/src/include/distributed/metadata_sync.h b/src/include/distributed/metadata_sync.h index b39b90502..bcefd46b4 100644 --- a/src/include/distributed/metadata_sync.h +++ b/src/include/distributed/metadata_sync.h @@ -51,11 +51,14 @@ extern char * CreateSchemaDDLCommand(Oid schemaId); extern List * GrantOnSchemaDDLCommands(Oid schemaId); extern char * PlacementUpsertCommand(uint64 shardId, uint64 placementId, int shardState, uint64 shardLength, int32 groupId); +extern void CreateShellTableOnWorkers(Oid relationId); extern void CreateTableMetadataOnWorkers(Oid relationId); extern BackgroundWorkerHandle * SpawnSyncMetadataToNodes(Oid database, Oid owner); extern void SyncMetadataToNodesMain(Datum main_arg); extern void SignalMetadataSyncDaemon(Oid database, int sig); extern bool ShouldInitiateMetadataSync(bool *lockFailure); +extern void SetLocalEnableDependencyCreation(bool state); +extern List * SequenceDependencyCommandList(Oid relationId); extern List * DDLCommandsForSequence(Oid sequenceOid, char *ownerName); extern List * SequenceDDLCommandsForTable(Oid relationId);