Simplify metadata snapshot syncing

pg_dist_object-metadatasync
Jelte Fennema 2021-07-22 16:05:02 +02:00
parent 11f8f1f046
commit 19d61c6bcd
4 changed files with 75 additions and 53 deletions

View File

@ -31,26 +31,56 @@ typedef bool (*AddressPredicate)(const ObjectAddress *);
static List * GetDependencyCreateDDLCommands(const ObjectAddress *dependency); static List * GetDependencyCreateDDLCommands(const ObjectAddress *dependency);
static List * FilterObjectAddressListByPredicate(List *objectAddressList, static List * FilterObjectAddressListByPredicate(List *objectAddressList,
AddressPredicate predicate); AddressPredicate predicate);
static void EnsureDependenciesExistOnAllNodesInternal(const ObjectAddress *target,
bool localOnlyMarkDistributed);
bool EnableDependencyCreation = true; bool EnableDependencyCreation = true;
/* /*
* EnsureDependenciesExistOnAllNodes finds all the dependencies that we support and makes * EnsureDependenciesExistOnAllNodes finds all the dependencies that we support
* sure these are available on all workers. If not available they will be created on the * and makes sure these are available on all workers. If not available they
* workers via a separate session that will be committed directly so that the objects are * will be created on the workers via a separate session that will be committed
* visible to potentially multiple sessions creating the shards. * directly so that the objects are visible to potentially multiple sessions
* creating the shards. After making the objects available it also marks them
* as distributed.
* *
* Note; only the actual objects are created via a separate session, the local records to * Note; only the actual objects are created via a separate session, the
* pg_dist_object are created in this session. As a side effect the objects could be * records to pg_dist_object are created in this session. As a side effect the
* created on the workers without a catalog entry on the coordinator. Updates to the * objects could be created on the workers without a catalog entry on the
* objects on the coordinator are not propagated to the workers until the record is * coordinator. Updates to the objects on the coordinator are not propagated to
* visible on the coordinator. * the workers until the record is visible on the coordinator.
* *
* This is solved by creating the dependencies in an idempotent manner, either via * This is solved by creating the dependencies in an idempotent manner, either via
* postgres native CREATE IF NOT EXISTS, or citus helper functions. * postgres native CREATE IF NOT EXISTS, or citus helper functions.
*/ */
List * void
EnsureDependenciesExistOnAllNodesWithoutMarkingDistributed(const ObjectAddress *target) EnsureDependenciesExistOnAllNodes(const ObjectAddress *target)
{
EnsureDependenciesExistOnAllNodesInternal(target, false);
}
/*
* EnsureDependenciesExistOnAllNodesLocalOnlyMarkDistributed is the same as
* EnsureDependenciesExistOnAllNodes, except that it only marks objects
* distributed on the local node. This should never be used, except during
* initial metadata syncing.
*/
void
EnsureDependenciesExistOnAllNodesLocalOnlyMarkDistributed(const ObjectAddress *target)
{
EnsureDependenciesExistOnAllNodesInternal(target, true);
}
/*
* EnsureDependenciesExistOnAllNodesInternal is the internal function used by
* EnsureDependenciesExistOnAllNodes and
* EnsureDependenciesExistOnAllNodesLocalOnlyMarkDistributed.
*/
static void
EnsureDependenciesExistOnAllNodesInternal(const ObjectAddress *target,
bool localOnlyMarkDistributed)
{ {
List *dependenciesWithCommands = NIL; List *dependenciesWithCommands = NIL;
List *ddlCommands = NULL; List *ddlCommands = NULL;
@ -69,10 +99,11 @@ EnsureDependenciesExistOnAllNodesWithoutMarkingDistributed(const ObjectAddress *
dependenciesWithCommands = lappend(dependenciesWithCommands, dependency); dependenciesWithCommands = lappend(dependenciesWithCommands, dependency);
} }
} }
if (list_length(ddlCommands) <= 0) if (list_length(ddlCommands) <= 0)
{ {
/* no ddl commands to be executed */ /* no ddl commands to be executed */
return NIL; return;
} }
/* since we are executing ddl commands lets disable propagation, primarily for mx */ /* since we are executing ddl commands lets disable propagation, primarily for mx */
@ -102,27 +133,15 @@ EnsureDependenciesExistOnAllNodesWithoutMarkingDistributed(const ObjectAddress *
ddlCommands); ddlCommands);
} }
return dependenciesWithCommands;
}
void
EnsureDependenciesExistOnAllNodes(const ObjectAddress *target)
{
List *dependenciesWithCommands =
EnsureDependenciesExistOnAllNodesWithoutMarkingDistributed(target);
/* /*
* It is possible to create distributed tables which depend on other dependencies * It is possible to create distributed tables which depend on other dependencies
* before any node is in the cluster. If we would wait till we actually had connected * before any node is in the cluster. If we would wait till we actually had connected
* to the nodes before marking the objects as distributed these objects would never be * to the nodes before marking the objects as distributed these objects would never be
* created on the workers when they get added, causing shards to fail to create. * created on the workers when they get added, causing shards to fail to create.
*/ */
ObjectAddress *dependency = NULL;
foreach_ptr(dependency, dependenciesWithCommands) foreach_ptr(dependency, dependenciesWithCommands)
{ {
bool localOnly = false; MarkObjectDistributed(dependency, localOnlyMarkDistributed);
MarkObjectDistributed(dependency, localOnly);
} }
} }

View File

@ -358,10 +358,8 @@ SyncMetadataSnapshotToNode(WorkerNode *workerNode, bool raiseOnError)
/* generate the queries which drop the metadata */ /* generate the queries which drop the metadata */
List *dropMetadataCommandList = MetadataDropCommands(); List *dropMetadataCommandList = MetadataDropCommands();
List *newDistributedObjects = NIL;
/* generate the queries which create the metadata from scratch */ /* generate the queries which create the metadata from scratch */
List *createMetadataCommandList = MetadataCreateCommands(&newDistributedObjects); List *createMetadataCommandList = MetadataCreateCommands();
List *recreateMetadataSnapshotCommandList = list_make1(localGroupIdUpdateCommand); List *recreateMetadataSnapshotCommandList = list_make1(localGroupIdUpdateCommand);
recreateMetadataSnapshotCommandList = list_concat(recreateMetadataSnapshotCommandList, recreateMetadataSnapshotCommandList = list_concat(recreateMetadataSnapshotCommandList,
@ -394,14 +392,8 @@ SyncMetadataSnapshotToNode(WorkerNode *workerNode, bool raiseOnError)
return false; return false;
} }
} }
MarkNodeMetadataSynced(workerNode->workerName, workerNode->workerPort, true);
ObjectAddress *address; MarkNodeMetadataSynced(workerNode->workerName, workerNode->workerPort, true);
foreach_ptr(address, newDistributedObjects)
{
bool localOnly = false;
MarkObjectDistributed(address, localOnly);
}
return true; return true;
} }
@ -441,17 +433,9 @@ DropMetadataSnapshotOnNode(WorkerNode *workerNode)
* (iv) Queries that populate pg_dist_shard table referenced by (iii) * (iv) Queries that populate pg_dist_shard table referenced by (iii)
* (v) Queries that populate pg_dist_placement table referenced by (iv) * (v) Queries that populate pg_dist_placement table referenced by (iv)
* (vi) Queries that populate pg_dist_object * (vi) Queries that populate pg_dist_object
*
* The first argument will be filled with a list of objects that are after the
* returned commands have been run. These objects should then be marked
* distributed afterwards. This function does not mark them as distributed
* directly. The reason for that is marking as distributed is also done on the
* metadata workers, and thus can only be done safely once all nodes their
* metadata is up to date. Since this function is only called when at least one
* node is out of sync, doing it in this function would always fail.
*/ */
List * List *
MetadataCreateCommands(List **newDistributedObjects) MetadataCreateCommands()
{ {
List *metadataSnapshotCommandList = NIL; List *metadataSnapshotCommandList = NIL;
List *distributedTableList = CitusTableList(); List *distributedTableList = CitusTableList();
@ -513,14 +497,30 @@ MetadataCreateCommands(List **newDistributedObjects)
Oid sequenceOid; Oid sequenceOid;
foreach_oid(sequenceOid, dependentSequenceList) foreach_oid(sequenceOid, dependentSequenceList)
{ {
ObjectAddress *sequenceAddress = palloc(sizeof(ObjectAddress)); ObjectAddress sequenceAddress;
ObjectAddressSet(*sequenceAddress, RelationRelationId, sequenceOid); ObjectAddressSet(sequenceAddress, RelationRelationId, sequenceOid);
List *addedDependencies =
EnsureDependenciesExistOnAllNodesWithoutMarkingDistributed( /*
sequenceAddress); * We only mark these new objects as distributed in pg_dist_object
*newDistributedObjects = list_concat(*newDistributedObjects, * locally. We cannot do it on all nodes with metadata syncing
addedDependencies); * enabled. It would error out for the node we're currently
*newDistributedObjects = lappend(*newDistributedObjects, sequenceAddress); * syncing, because it's metadata is not up to date.
*
* Instead we rely on the initial sync of the pg_dist_object
* contents at the end of this function. By inserting it here
* locally, it will become part of that sync automatically.
*
* The only downside of this approach is that it won't be synced to
* other metadata nodes than the current one. This should not be a
* problem however, because this marking as distributed only does
* something for the very first metadata worker that is added. For
* future nodes these rows in pg_dist_object would have either
* already been added when metadata was synced to them or when the
* distributed table was created.
*/
EnsureDependenciesExistOnAllNodesLocalOnlyMarkDistributed(&sequenceAddress);
bool localOnly = true;
MarkObjectDistributed(&sequenceAddress, localOnly);
} }
List *workerSequenceDDLCommands = SequenceDDLCommandsForTable(relationId); List *workerSequenceDDLCommands = SequenceDDLCommandsForTable(relationId);

View File

@ -31,7 +31,7 @@ typedef enum
extern void StartMetadataSyncToNode(const char *nodeNameString, int32 nodePort); extern void StartMetadataSyncToNode(const char *nodeNameString, int32 nodePort);
extern bool ClusterHasKnownMetadataWorkers(void); extern bool ClusterHasKnownMetadataWorkers(void);
extern bool ShouldSyncTableMetadata(Oid relationId); extern bool ShouldSyncTableMetadata(Oid relationId);
extern List * MetadataCreateCommands(List **newDistributedObjects); extern List * MetadataCreateCommands();
extern List * MetadataDropCommands(void); extern List * MetadataDropCommands(void);
extern char * DistributedObjectCreateCommand(const ObjectAddress *address, extern char * DistributedObjectCreateCommand(const ObjectAddress *address,
int32 *distributionArgumentIndex, int32 *distributionArgumentIndex,

View File

@ -251,6 +251,9 @@ extern void CreateTruncateTrigger(Oid relationId);
extern TableConversionReturn * UndistributeTable(TableConversionParameters *params); extern TableConversionReturn * UndistributeTable(TableConversionParameters *params);
extern void EnsureDependenciesExistOnAllNodes(const ObjectAddress *target); extern void EnsureDependenciesExistOnAllNodes(const ObjectAddress *target);
extern void EnsureDependenciesExistOnAllNodesLocalOnlyMarkDistributed(const
ObjectAddress *
target);
extern List * GetDistributableDependenciesForObject(const ObjectAddress *target); extern List * GetDistributableDependenciesForObject(const ObjectAddress *target);
extern bool ShouldPropagate(void); extern bool ShouldPropagate(void);
extern bool ShouldPropagateObject(const ObjectAddress *address); extern bool ShouldPropagateObject(const ObjectAddress *address);