diff --git a/src/backend/distributed/commands/dependencies.c b/src/backend/distributed/commands/dependencies.c index f9dbba8ad..21d742c14 100644 --- a/src/backend/distributed/commands/dependencies.c +++ b/src/backend/distributed/commands/dependencies.c @@ -31,26 +31,56 @@ typedef bool (*AddressPredicate)(const ObjectAddress *); static List * GetDependencyCreateDDLCommands(const ObjectAddress *dependency); static List * FilterObjectAddressListByPredicate(List *objectAddressList, AddressPredicate predicate); +static void EnsureDependenciesExistOnAllNodesInternal(const ObjectAddress *target, + bool localOnlyMarkDistributed); bool EnableDependencyCreation = true; /* - * EnsureDependenciesExistOnAllNodes finds all the dependencies that we support and makes - * sure these are available on all workers. If not available they will be created on the - * workers via a separate session that will be committed directly so that the objects are - * visible to potentially multiple sessions creating the shards. + * EnsureDependenciesExistOnAllNodes finds all the dependencies that we support + * and makes sure these are available on all workers. If not available they + * will be created on the workers via a separate session that will be committed + * directly so that the objects are visible to potentially multiple sessions + * creating the shards. After making the objects available it also marks them + * as distributed. * - * Note; only the actual objects are created via a separate session, the local records to - * pg_dist_object are created in this session. As a side effect the objects could be - * created on the workers without a catalog entry on the coordinator. Updates to the - * objects on the coordinator are not propagated to the workers until the record is - * visible on the coordinator. + * Note; only the actual objects are created via a separate session, the + * records to pg_dist_object are created in this session. As a side effect the + * objects could be created on the workers without a catalog entry on the + * coordinator. Updates to the objects on the coordinator are not propagated to + * the workers until the record is visible on the coordinator. * * This is solved by creating the dependencies in an idempotent manner, either via * postgres native CREATE IF NOT EXISTS, or citus helper functions. */ -List * -EnsureDependenciesExistOnAllNodesWithoutMarkingDistributed(const ObjectAddress *target) +void +EnsureDependenciesExistOnAllNodes(const ObjectAddress *target) +{ + EnsureDependenciesExistOnAllNodesInternal(target, false); +} + + +/* + * EnsureDependenciesExistOnAllNodesLocalOnlyMarkDistributed is the same as + * EnsureDependenciesExistOnAllNodes, except that it only marks objects + * distributed on the local node. This should never be used, except during + * initial metadata syncing. + */ +void +EnsureDependenciesExistOnAllNodesLocalOnlyMarkDistributed(const ObjectAddress *target) +{ + EnsureDependenciesExistOnAllNodesInternal(target, true); +} + + +/* + * EnsureDependenciesExistOnAllNodesInternal is the internal function used by + * EnsureDependenciesExistOnAllNodes and + * EnsureDependenciesExistOnAllNodesLocalOnlyMarkDistributed. + */ +static void +EnsureDependenciesExistOnAllNodesInternal(const ObjectAddress *target, + bool localOnlyMarkDistributed) { List *dependenciesWithCommands = NIL; List *ddlCommands = NULL; @@ -69,10 +99,11 @@ EnsureDependenciesExistOnAllNodesWithoutMarkingDistributed(const ObjectAddress * dependenciesWithCommands = lappend(dependenciesWithCommands, dependency); } } + if (list_length(ddlCommands) <= 0) { /* no ddl commands to be executed */ - return NIL; + return; } /* since we are executing ddl commands lets disable propagation, primarily for mx */ @@ -102,27 +133,15 @@ EnsureDependenciesExistOnAllNodesWithoutMarkingDistributed(const ObjectAddress * ddlCommands); } - return dependenciesWithCommands; -} - - -void -EnsureDependenciesExistOnAllNodes(const ObjectAddress *target) -{ - List *dependenciesWithCommands = - EnsureDependenciesExistOnAllNodesWithoutMarkingDistributed(target); - /* * It is possible to create distributed tables which depend on other dependencies * before any node is in the cluster. If we would wait till we actually had connected * to the nodes before marking the objects as distributed these objects would never be * created on the workers when they get added, causing shards to fail to create. */ - ObjectAddress *dependency = NULL; foreach_ptr(dependency, dependenciesWithCommands) { - bool localOnly = false; - MarkObjectDistributed(dependency, localOnly); + MarkObjectDistributed(dependency, localOnlyMarkDistributed); } } diff --git a/src/backend/distributed/metadata/metadata_sync.c b/src/backend/distributed/metadata/metadata_sync.c index e059aa386..35d5aa3d6 100644 --- a/src/backend/distributed/metadata/metadata_sync.c +++ b/src/backend/distributed/metadata/metadata_sync.c @@ -358,10 +358,8 @@ SyncMetadataSnapshotToNode(WorkerNode *workerNode, bool raiseOnError) /* generate the queries which drop the metadata */ List *dropMetadataCommandList = MetadataDropCommands(); - List *newDistributedObjects = NIL; - /* generate the queries which create the metadata from scratch */ - List *createMetadataCommandList = MetadataCreateCommands(&newDistributedObjects); + List *createMetadataCommandList = MetadataCreateCommands(); List *recreateMetadataSnapshotCommandList = list_make1(localGroupIdUpdateCommand); recreateMetadataSnapshotCommandList = list_concat(recreateMetadataSnapshotCommandList, @@ -394,14 +392,8 @@ SyncMetadataSnapshotToNode(WorkerNode *workerNode, bool raiseOnError) return false; } } - MarkNodeMetadataSynced(workerNode->workerName, workerNode->workerPort, true); - ObjectAddress *address; - foreach_ptr(address, newDistributedObjects) - { - bool localOnly = false; - MarkObjectDistributed(address, localOnly); - } + MarkNodeMetadataSynced(workerNode->workerName, workerNode->workerPort, true); return true; } @@ -441,17 +433,9 @@ DropMetadataSnapshotOnNode(WorkerNode *workerNode) * (iv) Queries that populate pg_dist_shard table referenced by (iii) * (v) Queries that populate pg_dist_placement table referenced by (iv) * (vi) Queries that populate pg_dist_object - * - * The first argument will be filled with a list of objects that are after the - * returned commands have been run. These objects should then be marked - * distributed afterwards. This function does not mark them as distributed - * directly. The reason for that is marking as distributed is also done on the - * metadata workers, and thus can only be done safely once all nodes their - * metadata is up to date. Since this function is only called when at least one - * node is out of sync, doing it in this function would always fail. */ List * -MetadataCreateCommands(List **newDistributedObjects) +MetadataCreateCommands() { List *metadataSnapshotCommandList = NIL; List *distributedTableList = CitusTableList(); @@ -513,14 +497,30 @@ MetadataCreateCommands(List **newDistributedObjects) Oid sequenceOid; foreach_oid(sequenceOid, dependentSequenceList) { - ObjectAddress *sequenceAddress = palloc(sizeof(ObjectAddress)); - ObjectAddressSet(*sequenceAddress, RelationRelationId, sequenceOid); - List *addedDependencies = - EnsureDependenciesExistOnAllNodesWithoutMarkingDistributed( - sequenceAddress); - *newDistributedObjects = list_concat(*newDistributedObjects, - addedDependencies); - *newDistributedObjects = lappend(*newDistributedObjects, sequenceAddress); + ObjectAddress sequenceAddress; + ObjectAddressSet(sequenceAddress, RelationRelationId, sequenceOid); + + /* + * We only mark these new objects as distributed in pg_dist_object + * locally. We cannot do it on all nodes with metadata syncing + * enabled. It would error out for the node we're currently + * syncing, because it's metadata is not up to date. + * + * Instead we rely on the initial sync of the pg_dist_object + * contents at the end of this function. By inserting it here + * locally, it will become part of that sync automatically. + * + * The only downside of this approach is that it won't be synced to + * other metadata nodes than the current one. This should not be a + * problem however, because this marking as distributed only does + * something for the very first metadata worker that is added. For + * future nodes these rows in pg_dist_object would have either + * already been added when metadata was synced to them or when the + * distributed table was created. + */ + EnsureDependenciesExistOnAllNodesLocalOnlyMarkDistributed(&sequenceAddress); + bool localOnly = true; + MarkObjectDistributed(&sequenceAddress, localOnly); } List *workerSequenceDDLCommands = SequenceDDLCommandsForTable(relationId); diff --git a/src/include/distributed/metadata_sync.h b/src/include/distributed/metadata_sync.h index 1267d245f..dc71a88e8 100644 --- a/src/include/distributed/metadata_sync.h +++ b/src/include/distributed/metadata_sync.h @@ -31,7 +31,7 @@ typedef enum extern void StartMetadataSyncToNode(const char *nodeNameString, int32 nodePort); extern bool ClusterHasKnownMetadataWorkers(void); extern bool ShouldSyncTableMetadata(Oid relationId); -extern List * MetadataCreateCommands(List **newDistributedObjects); +extern List * MetadataCreateCommands(); extern List * MetadataDropCommands(void); extern char * DistributedObjectCreateCommand(const ObjectAddress *address, int32 *distributionArgumentIndex, diff --git a/src/include/distributed/metadata_utility.h b/src/include/distributed/metadata_utility.h index 3f9fbd090..eb5d80b73 100644 --- a/src/include/distributed/metadata_utility.h +++ b/src/include/distributed/metadata_utility.h @@ -251,6 +251,9 @@ extern void CreateTruncateTrigger(Oid relationId); extern TableConversionReturn * UndistributeTable(TableConversionParameters *params); extern void EnsureDependenciesExistOnAllNodes(const ObjectAddress *target); +extern void EnsureDependenciesExistOnAllNodesLocalOnlyMarkDistributed(const + ObjectAddress * + target); extern List * GetDistributableDependenciesForObject(const ObjectAddress *target); extern bool ShouldPropagate(void); extern bool ShouldPropagateObject(const ObjectAddress *address);