Combine metadata logic

velioglu/wo_seq_test_1
Burak Velioglu 2022-01-20 17:03:50 +03:00
parent 632d7b8038
commit da3a0f4cdd
No known key found for this signature in database
GPG Key ID: F6827E620F6549C6
5 changed files with 83 additions and 138 deletions

View File

@ -372,7 +372,7 @@ GetCitusTableDDLCommandList(Oid relationId)
* clusterHasDistributedFunction if there are any distributed functions.
*/
List *
ReplicateAllDependenciesToNodeCommandList(const char *nodeName, int nodePort)
ReplicateAllObjectsToNodeCommandList(const char *nodeName, int nodePort)
{
/* since we are executing ddl commands disable propagation first, primarily for mx */
List *ddlCommands = list_make1(DISABLE_DDL_PROPAGATION);

View File

@ -107,10 +107,8 @@ static void InsertPlaceholderCoordinatorRecord(void);
static void InsertNodeRow(int nodeid, char *nodename, int32 nodeport, NodeMetadata
*nodeMetadata);
static void DeleteNodeRow(char *nodename, int32 nodeport);
static List * MetadataSetupCommandList();
static List * ClearMetadataCommandList();
static List * ClearShellTablesCommandList();
static void SetUpDistributedTableWithDependencies(WorkerNode *workerNode);
static List * PgDistMetadataSyncCommandList();
static void SyncTableMetadataToNode(WorkerNode *workerNode);
static List * InterTableRelationshipCommandList();
static WorkerNode * TupleToWorkerNode(TupleDesc tupleDescriptor, HeapTuple heapTuple);
static List * PropagateNodeWideObjectsCommandList();
@ -630,11 +628,11 @@ InterTableRelationshipCommandList()
/*
* MetadataSetupCommandList get the command list to set up the metadata
* depending on the distributed object on the given node.
* PgDistMetadataSyncCommandList returns the command list to sync the pg_dist_*
* metadata.
*/
static List *
MetadataSetupCommandList()
PgDistMetadataSyncCommandList()
{
List *distributedTableList = CitusTableList();
List *propagatedTableList = NIL;
@ -695,27 +693,6 @@ MetadataSetupCommandList()
}
/*
* ClearShellTablesCommandList returns the command list to clear (shell) distributed
* tables from the given node.
*/
static List *
ClearShellTablesCommandList()
{
List *clearShellTablesCommandList = NIL;
clearShellTablesCommandList = lappend(clearShellTablesCommandList,
REMOVE_ALL_CLUSTERED_TABLES_ONLY_COMMAND);
clearShellTablesCommandList = list_concat(list_make1(DISABLE_DDL_PROPAGATION),
clearShellTablesCommandList);
clearShellTablesCommandList = list_concat(clearShellTablesCommandList,
list_make1(ENABLE_DDL_PROPAGATION));
return clearShellTablesCommandList;
}
/*
* PropagateNodeWideObjectsCommandList is called during node activation to
* propagate any object that should be propagated for every node. These are
@ -749,94 +726,68 @@ PropagateNodeWideObjectsCommandList()
/*
* RecreateDistributedTablesWithDependenciesCommandList return command list to recreate
* distributed tables with command list.
* SyncTableMetadataCommandList returns commands to sync table metadata to the
* given worker node. To be idempotent, it first drops the ones required to be
* dropped.
*
* Table metadata includes:
*
* - All dependencies (e.g., types, schemas, sequences)
* - All shell distributed tables
* - pg_dist_partition, pg_dist_shard, pg_dist_placement, pg_dist_object
*
*/
List *
RecreateDistributedTablesWithDependenciesCommandList(WorkerNode *workerNode)
SyncTableMetadataCommandList(WorkerNode *workerNode)
{
List *commandList = NIL;
/*
* Remove shell tables and pg_dist_* metadata.
*/
commandList = list_concat(commandList, DetachPartitionCommandList());
commandList = list_concat(commandList, ClearShellTablesCommandList());
commandList = lappend(commandList, REMOVE_ALL_CITUS_TABLES_COMMAND);
commandList = lappend(commandList, DELETE_ALL_DISTRIBUTED_OBJECTS);
/*
* Propagate node wide objects. It includes only roles for now.
*/
commandList = list_concat(commandList, PropagateNodeWideObjectsCommandList());
commandList = list_concat(commandList, list_make1(LocalGroupIdUpdateCommand(
workerNode->groupId)));
commandList = list_concat(commandList,
ReplicateAllDependenciesToNodeCommandList(
workerNode->workerName,
workerNode->workerPort));
commandList = list_concat(commandList,
InterTableRelationshipCommandList());
/*
* Replicate all objects of the pg_dist_object to the remote node. We need to
* update local group id first, as sequence replication logic depends on it.
*/
commandList = list_concat(commandList, list_make1(LocalGroupIdUpdateCommand(workerNode->groupId)));
commandList = list_concat(commandList, ReplicateAllObjectsToNodeCommandList(workerNode->workerName, workerNode->workerPort));
/*
* After creating each table, handle the inter table relationship between
* those tables.
*/
commandList = list_concat(commandList, InterTableRelationshipCommandList());
/*
* Finally create pg_dist_* entries
*/
List *syncPgDistMetadataCommandList = PgDistMetadataSyncCommandList();
commandList = list_concat(commandList, syncPgDistMetadataCommandList);
return commandList;
}
/*
* ClearMetadataCommandList returns the command list to clear all the
* distributed objects and metadata from the given node.
*/
static List *
ClearMetadataCommandList()
{
List *clearDistTableInfoCommandList = NIL;
clearDistTableInfoCommandList = lappend(clearDistTableInfoCommandList,
DELETE_ALL_PARTITIONS);
clearDistTableInfoCommandList = lappend(clearDistTableInfoCommandList,
DELETE_ALL_SHARDS);
clearDistTableInfoCommandList = lappend(clearDistTableInfoCommandList,
DELETE_ALL_PLACEMENTS);
clearDistTableInfoCommandList = lappend(clearDistTableInfoCommandList,
DELETE_ALL_DISTRIBUTED_OBJECTS);
clearDistTableInfoCommandList = list_concat(list_make1(DISABLE_DDL_PROPAGATION),
clearDistTableInfoCommandList);
clearDistTableInfoCommandList = list_concat(clearDistTableInfoCommandList, list_make1(
ENABLE_DDL_PROPAGATION));
return clearDistTableInfoCommandList;
}
/*
* ResyncMetadataCommandList returns the command list to resync all the
* distributed table related metadata.
*/
List *
ResyncMetadataCommandList(void)
{
List *resyncMetadataCommandList = NIL;
List *clearMetadataCommandList = ClearMetadataCommandList();
resyncMetadataCommandList = list_concat(resyncMetadataCommandList,
clearMetadataCommandList);
List *setupMetadataCommandList = MetadataSetupCommandList();
resyncMetadataCommandList = list_concat(resyncMetadataCommandList,
setupMetadataCommandList);
return resyncMetadataCommandList;
}
/*
* SetUpDistributedTableWithDependencies sets up up the following on a node if it's
* a primary node that currently stores data:
* SyncTableMetadataToNode sync the table metadata to the node. Metadata includes
* - All dependencies (e.g., types, schemas, sequences)
* - All shell distributed table
* - Reference tables, because they are needed to handle queries efficiently.
* - Distributed functions
* - pg_dist_partition, pg_dist_shard, pg_dist_placement, pg_dist_object
*
* Note that we do not create the distributed dependencies on the coordinator
* since all the dependencies should be present in the coordinator already.
*/
static void
SetUpDistributedTableWithDependencies(WorkerNode *newWorkerNode)
SyncTableMetadataToNode(WorkerNode *newWorkerNode)
{
if (NodeIsPrimary(newWorkerNode))
{
@ -845,8 +796,7 @@ SetUpDistributedTableWithDependencies(WorkerNode *newWorkerNode)
Assert(ShouldPropagate());
if (!NodeIsCoordinator(newWorkerNode))
{
List *commandList = RecreateDistributedTablesWithDependenciesCommandList(
newWorkerNode);
List *commandList = SyncTableMetadataCommandList(newWorkerNode);
/* send commands to new workers, the current user should be a superuser */
Assert(superuser());
@ -856,12 +806,6 @@ SetUpDistributedTableWithDependencies(WorkerNode *newWorkerNode)
CurrentUserName(),
commandList);
}
if (ReplicateReferenceTablesOnActivate)
{
ReplicateAllReferenceTablesToNode(newWorkerNode->workerName,
newWorkerNode->workerPort);
}
}
}
@ -1080,12 +1024,6 @@ ActivateNode(char *nodeName, int nodePort)
*/
EnsureSuperUser();
if (!EnableDependencyCreation)
{
ereport(ERROR, (errmsg("citus.enable_object_propagation must be on to "
"add an active node")));
}
/* take an exclusive lock on pg_dist_node to serialize pg_dist_node changes */
LockRelationOid(DistNodeRelationId(), ExclusiveLock);
@ -1122,26 +1060,38 @@ ActivateNode(char *nodeName, int nodePort)
/* TODO: Once all tests will be enabled for MX, we can remove sync by default check */
bool syncMetadata = EnableMetadataSyncByDefault && NodeIsPrimary(workerNode);
if (syncMetadata)
if (syncMetadata && EnableDependencyCreation)
{
/*
* We are going to sync the metadata anyway in this transaction, so do
* not fail just because the current metadata is not synced.
*/
SetWorkerColumn(workerNode, Anum_pg_dist_node_metadatasynced,
BoolGetDatum(true));
/*
* Sync table metadata first. Please check the comment on SyncTableMetadataToNode
* for the definition of table metadata.
*/
SyncTableMetadataToNode(workerNode);
/*
* We need to replicate reference tables before syncing node metadata, otherwise
* reference table replication logic would try to get lock on the new node before
* having the shard placement on it
*/
if (ReplicateReferenceTablesOnActivate)
{
ReplicateAllReferenceTablesToNode(workerNode->workerName,
workerNode->workerPort);
}
/*
* Sync node metadata (pg_dist_node) finally.
*/
SyncNodeMetadataToNode(nodeName, nodePort);
}
SetUpDistributedTableWithDependencies(workerNode);
if (syncMetadata && !NodeIsCoordinator(workerNode))
{
List *metadataUpdateCommandList = ResyncMetadataCommandList();
/* send commands to the new worker, the current user should be a superuser */
Assert(superuser());
SendMetadataCommandListToWorkerInCoordinatedTransaction(
workerNode->workerName,
workerNode->workerPort,
CurrentUserName(),
metadataUpdateCommandList);
}
/* finally, let all other active metadata nodes to learn about this change */
WorkerNode *newWorkerNode = SetNodeState(nodeName, nodePort, isActive);
Assert(newWorkerNode->nodeId == workerNode->nodeId);

View File

@ -48,22 +48,18 @@ activate_node_snapshot(PG_FUNCTION_ARGS)
*/
WorkerNode *dummyWorkerNode = GetFirstPrimaryWorkerNode();
List *recreateTablesCommands = RecreateDistributedTablesWithDependenciesCommandList(
dummyWorkerNode);
List *syncTableMetadataCommandList = SyncTableMetadataCommandList(dummyWorkerNode);
List *dropSnapshotCommands = NodeMetadataDropCommands();
List *createSnapshotCommands = NodeMetadataCreateCommands();
List *metadataUpdateCommandList = ResyncMetadataCommandList();
List *activateNodeCommandList = NIL;
int activateNodeCommandIndex = 0;
Oid ddlCommandTypeId = TEXTOID;
activateNodeCommandList = list_concat(activateNodeCommandList,
recreateTablesCommands);
syncTableMetadataCommandList);
activateNodeCommandList = list_concat(activateNodeCommandList, dropSnapshotCommands);
activateNodeCommandList = list_concat(activateNodeCommandList,
createSnapshotCommands);
activateNodeCommandList = list_concat(activateNodeCommandList,
metadataUpdateCommandList);
activateNodeCommandList = list_concat(activateNodeCommandList, createSnapshotCommands);
int activateNodeCommandCount = list_length(activateNodeCommandList);
Datum *activateNodeCommandDatumArray = palloc0(activateNodeCommandCount *

View File

@ -249,8 +249,7 @@ extern void EnsureDependenciesExistOnAllNodes(const ObjectAddress *target);
extern List * GetDistributableDependenciesForObject(const ObjectAddress *target);
extern bool ShouldPropagate(void);
extern bool ShouldPropagateObject(const ObjectAddress *address);
extern List * ReplicateAllDependenciesToNodeCommandList(const char *nodeName, int
nodePort);
extern List * ReplicateAllObjectsToNodeCommandList(const char *nodeName, int nodePort);
/* Remaining metadata utility functions */
extern char * TableOwner(Oid relationId);

View File

@ -105,7 +105,7 @@ extern uint32 CountPrimariesWithMetadata(void);
extern WorkerNode * GetFirstPrimaryWorkerNode(void);
extern List * RecreateDistributedTablesWithDependenciesCommandList(
WorkerNode *workerNode);
extern List * ResyncMetadataCommandList(void);
extern List * SyncTableMetadataCommandList(WorkerNode *workerNode);
/* Function declarations for worker node utilities */
extern int CompareWorkerNodes(const void *leftElement, const void *rightElement);