Start moving table dependent metadata

velioglu/wo_seq_test_1
Burak Velioglu 2021-12-15 13:58:21 +03:00
parent 14f8cd5a75
commit fea68a43ad
No known key found for this signature in database
GPG Key ID: F6827E620F6549C6
3 changed files with 155 additions and 129 deletions

View File

@ -64,6 +64,7 @@ EnsureDependenciesExistOnAllNodes(const ObjectAddress *target)
List *dependencyCommands = GetDependencyCreateDDLCommands(dependency);
ddlCommands = list_concat(ddlCommands, dependencyCommands);
// TODO: Might add check for tables
/* create a new list with dependencies that actually created commands */
if (list_length(dependencyCommands) > 0)
{
@ -237,19 +238,20 @@ GetDependencyCreateDDLCommands(const ObjectAddress *dependency)
if (relKind == RELKIND_RELATION)
{
Oid relationId = dependency->objectId;
List *commandList = NIL;
List *tableDDLCommands = GetFullTableCreationCommands(
dependency->objectId,
WORKER_NEXTVAL_SEQUENCE_DEFAULTS);
List *tableDDLCommands = GetFullTableCreationCommands(relationId, WORKER_NEXTVAL_SEQUENCE_DEFAULTS);
TableDDLCommand *tableDDLCommand = NULL;
foreach_ptr(tableDDLCommand, tableDDLCommands)
{
Assert(CitusIsA(tableDDLCommand, TableDDLCommand));
commandList = lappend(commandList, GetTableDDLCommand(
tableDDLCommand));
commandList = lappend(commandList, GetTableDDLCommand(tableDDLCommand));
}
List *sequenceDependencyCommandList = SequenceDependencyCommandList(dependency->objectId);
commandList = list_concat(commandList, sequenceDependencyCommandList);
return commandList;
}

View File

@ -92,7 +92,6 @@ static char * LocalGroupIdUpdateCommand(int32 groupId);
static char * TruncateTriggerCreateCommand(Oid relationId);
static char * SchemaOwnerName(Oid objectId);
static bool HasMetadataWorkers(void);
static List * DetachPartitionCommandList(void);
static bool ShouldSyncTableMetadataInternal(bool hashDistributed,
bool citusTableWithNoDistKey);
static bool SyncMetadataSnapshotToNode(WorkerNode *workerNode, bool raiseOnError);
@ -522,18 +521,11 @@ DropMetadataSnapshotOnNode(WorkerNode *workerNode)
* following queries:
*
* (i) Query that populates pg_dist_node table
* (ii) Queries that create the foreign keys and partitioning hierarchy
* (iii) Queries that populate pg_dist_partition table referenced by (ii)
* (iv) Queries that populate pg_dist_shard table referenced by (iii)
* (v) Queries that populate pg_dist_placement table referenced by (iv)
* (vi) Queries that populate pg_dist_object table
*/
List *
MetadataCreateCommands(void)
{
List *metadataSnapshotCommandList = NIL;
List *distributedTableList = CitusTableList();
List *propagatedTableList = NIL;
bool includeNodesFromOtherClusters = true;
List *workerNodeList = ReadDistNode(includeNodesFromOtherClusters);
@ -545,49 +537,6 @@ MetadataCreateCommands(void)
metadataSnapshotCommandList = lappend(metadataSnapshotCommandList,
nodeListInsertCommand);
/* create the list of tables whose metadata will be created */
CitusTableCacheEntry *cacheEntry = NULL;
foreach_ptr(cacheEntry, distributedTableList)
{
if (ShouldSyncTableMetadata(cacheEntry->relationId))
{
propagatedTableList = lappend(propagatedTableList, cacheEntry);
}
}
/* create the metadata */
foreach_ptr(cacheEntry, propagatedTableList)
{
Oid clusteredTableId = cacheEntry->relationId;
/* add the table metadata command first*/
char *metadataCommand = DistributionCreateCommand(cacheEntry);
metadataSnapshotCommandList = lappend(metadataSnapshotCommandList,
metadataCommand);
/* add the truncate trigger command after the table became distributed */
char *truncateTriggerCreateCommand =
TruncateTriggerCreateCommand(cacheEntry->relationId);
metadataSnapshotCommandList = lappend(metadataSnapshotCommandList,
truncateTriggerCreateCommand);
/* add the pg_dist_shard{,placement} entries */
List *shardIntervalList = LoadShardIntervalList(clusteredTableId);
List *shardCreateCommandList = ShardListInsertCommand(shardIntervalList);
metadataSnapshotCommandList = list_concat(metadataSnapshotCommandList,
shardCreateCommandList);
}
/* As the last step, propagate the pg_dist_object entities */
if (ShouldPropagate())
{
List *distributedObjectSyncCommandList =
DistributedObjectMetadataSyncCommandList();
metadataSnapshotCommandList = list_concat(metadataSnapshotCommandList,
distributedObjectSyncCommandList);
}
return metadataSnapshotCommandList;
}
@ -739,33 +688,17 @@ GetDistributedTableDDLEvents(Oid relationId)
/*
* MetadataDropCommands returns list of queries that are required to
* drop all the metadata of the node that are related to clustered tables.
* drop all the metadata of the node that are not related to clustered tables.
* The drop metadata snapshot commands includes the following queries:
*
* (i) Query to disable DDL propagation (necessary for (ii)
* (ii) Queries that DETACH all partitions of distributed tables
* (iii) Queries that delete all the rows from pg_dist_node table
* (iv) Queries that drop the clustered tables and remove its references from
* the pg_dist_partition. Note that distributed relation ids are gathered
* from the worker itself to prevent dropping any non-distributed tables
* with the same name.
* (v) Queries that delete all the rows from pg_dist_shard table referenced by (iv)
* (vi) Queries that delete all the rows from pg_dist_placement table
* referenced by (v)
* (vii) Queries that delete all the rows from pg_dist_object table
* (i) Queries that delete all the rows from pg_dist_node table
*/
List *
MetadataDropCommands(void)
{
List *dropSnapshotCommandList = NIL;
List *detachPartitionCommandList = DetachPartitionCommandList();
dropSnapshotCommandList = list_concat(dropSnapshotCommandList,
detachPartitionCommandList);
dropSnapshotCommandList = lappend(dropSnapshotCommandList, DELETE_ALL_NODES);
dropSnapshotCommandList = lappend(dropSnapshotCommandList,
DELETE_ALL_DISTRIBUTED_OBJECTS);
return dropSnapshotCommandList;
}
@ -1984,55 +1917,6 @@ CreateTableMetadataOnWorkers(Oid relationId)
}
/*
* DetachPartitionCommandList returns list of DETACH commands to detach partitions
* of all distributed tables. This function is used for detaching partitions in MX
* workers before DROPping distributed partitioned tables in them. Thus, we are
* disabling DDL propagation to the beginning of the commands (we are also enabling
* DDL propagation at the end of command list to swtich back to original state). As
* an extra step, if there are no partitions to DETACH, this function simply returns
* empty list to not disable/enable DDL propagation for nothing.
*/
static List *
DetachPartitionCommandList(void)
{
List *detachPartitionCommandList = NIL;
List *distributedTableList = CitusTableList();
/* we iterate over all distributed partitioned tables and DETACH their partitions */
CitusTableCacheEntry *cacheEntry = NULL;
foreach_ptr(cacheEntry, distributedTableList)
{
if (!PartitionedTable(cacheEntry->relationId))
{
continue;
}
List *partitionList = PartitionList(cacheEntry->relationId);
List *detachCommands =
GenerateDetachPartitionCommandRelationIdList(partitionList);
detachPartitionCommandList = list_concat(detachPartitionCommandList,
detachCommands);
}
if (list_length(detachPartitionCommandList) == 0)
{
return NIL;
}
detachPartitionCommandList =
lcons(DISABLE_DDL_PROPAGATION, detachPartitionCommandList);
/*
* We probably do not need this but as an extra precaution, we are enabling
* DDL propagation to switch back to original state.
*/
detachPartitionCommandList = lappend(detachPartitionCommandList,
ENABLE_DDL_PROPAGATION);
return detachPartitionCommandList;
}
/*
* SyncMetadataToNodes tries recreating the metadata snapshot in the

View File

@ -91,6 +91,7 @@ typedef struct NodeMetadata
} NodeMetadata;
/* local function forward declarations */
static List * DetachPartitionCommandList(void);
static int ActivateNode(char *nodeName, int nodePort);
static void RemoveNodeFromCluster(char *nodeName, int32 nodePort);
static void ErrorIfNodeContainsNonRemovablePlacements(WorkerNode *workerNode);
@ -107,6 +108,8 @@ static void InsertNodeRow(int nodeid, char *nodename, int32 nodeport, NodeMetada
*nodeMetadata);
static void DeleteNodeRow(char *nodename, int32 nodeport);
static void SetUpSequenceDependencies(WorkerNode *workerNode);
static void SetUpObjectMetadata(WorkerNode *workerNode);
static void ClearDistributedTablesOnNode(WorkerNode *workerNode);
static void SetUpDistributedTableWithDependencies(WorkerNode *workerNode);
static void SetUpMultipleDistributedTableIntegrations(WorkerNode *workerNode);
static WorkerNode * TupleToWorkerNode(TupleDesc tupleDescriptor, HeapTuple heapTuple);
@ -678,17 +681,100 @@ SetUpSequenceDependencies(WorkerNode *workerNode)
List *sequenceDependencyCommandList = SequenceDependencyCommandList(relationId);
sequenceCommandList = list_concat(sequenceCommandList,
sequenceDependencyCommandList);
}
/* prevent recursive propagation */
SendCommandToWorkersWithMetadata(DISABLE_DDL_PROPAGATION);
/* send the commands one by one */
const char *command = NULL;
foreach_ptr(command, sequenceCommandList)
char *currentUser = CurrentUserName();
SendMetadataCommandListToWorkerInCoordinatedTransaction(workerNode->workerName,
workerNode->workerPort,
currentUser,
sequenceCommandList);
}
static void
SetUpObjectMetadata(WorkerNode *workerNode)
{
List *distributedTableList = CitusTableList();
List *propagatedTableList = NIL;
List *metadataSnapshotCommandList = NIL;
/* create the list of tables whose metadata will be created */
CitusTableCacheEntry *cacheEntry = NULL;
foreach_ptr(cacheEntry, distributedTableList)
{
SendCommandToWorkersWithMetadata(command);
if (ShouldSyncTableMetadata(cacheEntry->relationId))
{
propagatedTableList = lappend(propagatedTableList, cacheEntry);
}
}
/* after all tables are created, create the metadata */
foreach_ptr(cacheEntry, propagatedTableList)
{
Oid clusteredTableId = cacheEntry->relationId;
/* add the table metadata command first*/
char *metadataCommand = DistributionCreateCommand(cacheEntry);
metadataSnapshotCommandList = lappend(metadataSnapshotCommandList,
metadataCommand);
/* add the truncate trigger command after the table became distributed */
char *truncateTriggerCreateCommand =
TruncateTriggerCreateCommand(cacheEntry->relationId);
metadataSnapshotCommandList = lappend(metadataSnapshotCommandList,
truncateTriggerCreateCommand);
/* add the pg_dist_shard{,placement} entries */
List *shardIntervalList = LoadShardIntervalList(clusteredTableId);
List *shardCreateCommandList = ShardListInsertCommand(shardIntervalList);
metadataSnapshotCommandList = list_concat(metadataSnapshotCommandList,
shardCreateCommandList);
}
/* As the last step, propagate the pg_dist_object entities */
if (ShouldPropagate())
{
List *distributedObjectSyncCommandList =
DistributedObjectMetadataSyncCommandList();
metadataSnapshotCommandList = list_concat(metadataSnapshotCommandList,
distributedObjectSyncCommandList);
}
List *metadataSnapshotCommands = list_make2(DISABLE_DDL_PROPAGATION, metadataSnapshotCommandList);
char *currentUser = CurrentUserName();
SendMetadataCommandListToWorkerInCoordinatedTransaction(workerNode->workerName,
workerNode->workerPort,
currentUser,
metadataSnapshotCommands);
}
static void
ClearDistributedTablesOnNode(WorkerNode *workerNode)
{
List *clearDistTableInfoCommandList = NIL;
List *detachPartitionCommandList = DetachPartitionCommandList();
clearDistTableInfoCommandList = list_concat(clearDistTableInfoCommandList,
detachPartitionCommandList);
clearDistTableInfoCommandList = lappend(clearDistTableInfoCommandList,
REMOVE_ALL_CLUSTERED_TABLES_COMMAND);
clearDistTableInfoCommandList = lappend(clearDistTableInfoCommandList, DELETE_ALL_DISTRIBUTED_OBJECTS);
List *clearDistTableCommands = list_make2(DISABLE_DDL_PROPAGATION,
clearDistTableInfoCommandList);
char *currentUser = CurrentUserName();
SendMetadataCommandListToWorkerInCoordinatedTransaction(workerNode->workerName,
workerNode->workerPort,
currentUser,
clearDistTableCommands);
}
@ -1013,9 +1099,11 @@ ActivateNode(char *nodeName, int nodePort)
BoolGetDatum(isActive));
}
ClearDistributedTablesOnNode(workerNode);
SetUpDistributedTableWithDependencies(workerNode);
SetUpSequenceDependencies(workerNode);
SetUpMultipleDistributedTableIntegrations(workerNode);
SetUpObjectMetadata(workerNode);
if (syncMetadata)
{
@ -1030,6 +1118,58 @@ ActivateNode(char *nodeName, int nodePort)
}
/*
* DetachPartitionCommandList returns list of DETACH commands to detach partitions
* of all distributed tables. This function is used for detaching partitions in MX
* workers before DROPping distributed partitioned tables in them. Thus, we are
* disabling DDL propagation to the beginning of the commands (we are also enabling
* DDL propagation at the end of command list to swtich back to original state). As
* an extra step, if there are no partitions to DETACH, this function simply returns
* empty list to not disable/enable DDL propagation for nothing.
*/
static List *
DetachPartitionCommandList(void)
{
List *detachPartitionCommandList = NIL;
List *distributedTableList = CitusTableList();
/* we iterate over all distributed partitioned tables and DETACH their partitions */
CitusTableCacheEntry *cacheEntry = NULL;
foreach_ptr(cacheEntry, distributedTableList)
{
if (!PartitionedTable(cacheEntry->relationId))
{
continue;
}
List *partitionList = PartitionList(cacheEntry->relationId);
List *detachCommands =
GenerateDetachPartitionCommandRelationIdList(partitionList);
detachPartitionCommandList = list_concat(detachPartitionCommandList,
detachCommands);
}
if (list_length(detachPartitionCommandList) == 0)
{
return NIL;
}
detachPartitionCommandList =
lcons(DISABLE_DDL_PROPAGATION, detachPartitionCommandList);
/*
* We probably do not need this but as an extra precaution, we are enabling
* DDL propagation to switch back to original state.
*/
detachPartitionCommandList = lappend(detachPartitionCommandList,
ENABLE_DDL_PROPAGATION);
return detachPartitionCommandList;
}
/*
* citus_update_node moves the requested node to a different nodename and nodeport. It
* locks to ensure no queries are running concurrently; and is intended for customers who