metadata_sync_imp
Onder Kalaci 2022-11-28 19:25:00 +01:00
parent 93a446254c
commit 23775a861b
8 changed files with 232 additions and 213 deletions

View File

@ -532,15 +532,8 @@ GetAllDependencyCreateDDLCommands(const List *dependencies)
* clusterHasDistributedFunction if there are any distributed functions.
*/
void
ReplicateAllObjectsToNodeCommandList(List *nodeToSyncMetadataConnections,
List **ddlCommands)
ReplicateAllObjectsToNodeCommandList(MetadataSyncContext syncContext)
{
/* since we are executing ddl commands disable propagation first, primarily for mx */
if (ddlCommands != NULL)
{
*ddlCommands = list_make1(DISABLE_DDL_PROPAGATION);
}
/*
* collect all dependencies in creation order and get their ddl commands
*/
@ -582,30 +575,23 @@ ReplicateAllObjectsToNodeCommandList(List *nodeToSyncMetadataConnections,
continue;
}
List *perObjCommands = GetDependencyCreateDDLCommands(dependency);
if (list_length(nodeToSyncMetadataConnections) != 0)
if (syncContext.syncImmediately)
{
SendCommandListToWorkerOutsideTransactionWithConnection(linitial(
nodeToSyncMetadataConnections),
syncContext.
nodeConnectionList),
perObjCommands);
}
if (ddlCommands != NULL)
{
*ddlCommands = list_concat(*ddlCommands,
perObjCommands);
}
else
{
list_free_deep(perObjCommands);
syncContext.ddlCommandList =
list_concat(syncContext.ddlCommandList, perObjCommands);
}
}
if (ddlCommands != NULL)
{
*ddlCommands = lappend(*ddlCommands, ENABLE_DDL_PROPAGATION);
}
}

View File

@ -226,10 +226,8 @@ start_metadata_sync_to_all_nodes(PG_FUNCTION_ARGS)
* start_metadata_sync_to_node().
*/
void
SyncNodeMetadataToNode(const char *nodeNameString, int32 nodePort)
SyncNodeMetadataToNode(MetadataSyncContext syncContext)
{
char *escapedNodeName = quote_literal_cstr(nodeNameString);
CheckCitusVersion(ERROR);
EnsureCoordinator();
EnsureModificationsCanRun();
@ -663,44 +661,44 @@ DropMetadataSnapshotOnNode(WorkerNode *workerNode)
EnsureSequentialModeMetadataOperations();
char *userName = CurrentUserName();
List *dropMetadataCommandList = NIL;
/*
* Detach partitions, break dependencies between sequences and table then
* remove shell tables first.
*/
List *detachPartitionCommandList = NIL;
MetadataSyncContext syncContext;
syncContext.syncImmediately = false;
DetachPartitionCommandList(NIL, &detachPartitionCommandList);
dropMetadataCommandList = list_concat(dropMetadataCommandList,
detachPartitionCommandList);
DetachPartitionCommandList(syncContext);
dropMetadataCommandList = lappend(dropMetadataCommandList,
BREAK_CITUS_TABLE_SEQUENCE_DEPENDENCY_COMMAND);
dropMetadataCommandList = lappend(dropMetadataCommandList,
REMOVE_ALL_SHELL_TABLES_COMMAND);
dropMetadataCommandList = list_concat(dropMetadataCommandList,
NodeMetadataDropCommands());
dropMetadataCommandList = lappend(dropMetadataCommandList,
LocalGroupIdUpdateCommand(0));
syncContext.ddlCommandList = lappend(syncContext.ddlCommandList,
BREAK_CITUS_TABLE_SEQUENCE_DEPENDENCY_COMMAND);
syncContext.ddlCommandList = lappend(syncContext.ddlCommandList,
REMOVE_ALL_SHELL_TABLES_COMMAND);
syncContext.ddlCommandList = list_concat(syncContext.ddlCommandList,
NodeMetadataDropCommands());
syncContext.ddlCommandList = lappend(syncContext.ddlCommandList,
LocalGroupIdUpdateCommand(0));
/* remove all dist table and object/table related metadata afterwards */
dropMetadataCommandList = lappend(dropMetadataCommandList, DELETE_ALL_PARTITIONS);
dropMetadataCommandList = lappend(dropMetadataCommandList, DELETE_ALL_SHARDS);
dropMetadataCommandList = lappend(dropMetadataCommandList, DELETE_ALL_PLACEMENTS);
dropMetadataCommandList = lappend(dropMetadataCommandList,
DELETE_ALL_DISTRIBUTED_OBJECTS);
dropMetadataCommandList = lappend(dropMetadataCommandList, DELETE_ALL_COLOCATION);
syncContext.ddlCommandList = lappend(syncContext.ddlCommandList,
DELETE_ALL_PARTITIONS);
syncContext.ddlCommandList = lappend(syncContext.ddlCommandList, DELETE_ALL_SHARDS);
syncContext.ddlCommandList = lappend(syncContext.ddlCommandList,
DELETE_ALL_PLACEMENTS);
syncContext.ddlCommandList = lappend(syncContext.ddlCommandList,
DELETE_ALL_DISTRIBUTED_OBJECTS);
syncContext.ddlCommandList = lappend(syncContext.ddlCommandList,
DELETE_ALL_COLOCATION);
Assert(superuser());
SendOptionalMetadataCommandListToWorkerInCoordinatedTransaction(
workerNode->workerName,
workerNode->workerPort,
userName,
dropMetadataCommandList);
syncContext.ddlCommandList);
}
@ -2667,8 +2665,7 @@ CreateTableMetadataOnWorkers(Oid relationId)
* empty list to not disable/enable DDL propagation for nothing.
*/
void
DetachPartitionCommandList(List *nodeToSyncMetadataConnections,
List **detachPartitionCommandList)
DetachPartitionCommandList(MetadataSyncContext syncContext)
{
List *citusTableIdList = CitusTableTypeIdList(ANY_CITUS_TABLE_TYPE);
@ -2693,42 +2690,31 @@ DetachPartitionCommandList(List *nodeToSyncMetadataConnections,
Assert(PartitionTable(partitionRelOid));
char *detachCommand = GenerateDetachPartitionCommand(partitionRelOid);
if (list_length(nodeToSyncMetadataConnections) != 0)
if (syncContext.syncImmediately)
{
SendCommandListToWorkerOutsideTransactionWithConnection(linitial(
nodeToSyncMetadataConnections),
list_make1(
detachCommand));
}
SyncMetadataCommands(syncContext, list_make1(detachCommand), false);
if (detachPartitionCommandList != NULL)
{
*detachPartitionCommandList =
lappend(*detachPartitionCommandList, detachCommand);
pfree(detachCommand);
}
else
{
pfree(detachCommand);
syncContext.ddlCommandList =
lappend(syncContext.ddlCommandList, detachCommand);
}
}
}
if (!foundAnyPartitionedTable)
if (foundAnyPartitionedTable && !syncContext.syncImmediately)
{
return;
}
if (detachPartitionCommandList != NULL)
{
*detachPartitionCommandList =
lcons(DISABLE_DDL_PROPAGATION, *detachPartitionCommandList);
syncContext.ddlCommandList =
lcons(DISABLE_DDL_PROPAGATION, syncContext.ddlCommandList);
/*
* We probably do not need this but as an extra precaution, we are enabling
* DDL propagation to switch back to original state.
*/
*detachPartitionCommandList = lappend(*detachPartitionCommandList,
ENABLE_DDL_PROPAGATION);
syncContext.ddlCommandList = lappend(syncContext.ddlCommandList,
ENABLE_DDL_PROPAGATION);
}
}

View File

@ -33,6 +33,7 @@
#include "distributed/metadata/distobject.h"
#include "distributed/metadata_cache.h"
#include "distributed/metadata_sync.h"
#include "distributed/metadata_sync_context.h"
#include "distributed/multi_join_order.h"
#include "distributed/multi_router_planner.h"
#include "distributed/pg_dist_node.h"
@ -100,15 +101,13 @@ static void InsertPlaceholderCoordinatorRecord(void);
static void InsertNodeRow(int nodeid, char *nodename, int32 nodeport, NodeMetadata
*nodeMetadata);
static void DeleteNodeRow(char *nodename, int32 nodeport);
static void SyncDistributedObjectsToNodeList(List *workerNodeList);
static void UpdateLocalGroupIdOnNode(WorkerNode *workerNode);
static void SyncPgDistTableMetadataToNodeList(List *nodeToSyncMetadataConnections);
static void InterTableRelationshipCommandList(List *nodeToSyncMetadataConnections,
List **ddlCommandList);
static void SyncDistributedObjectsToNodeList(MetadataSyncContext syncContext);
static void UpdateLocalGroupIdOnNode(MultiConnection *connection, WorkerNode *workerNode);
static void SyncPgDistTableMetadataToNodeList(MetadataSyncContext syncContext);
static void InterTableRelationshipCommandList(MetadataSyncContext syncContext);
static void BlockDistributedQueriesOnMetadataNodes(void);
static WorkerNode * TupleToWorkerNode(TupleDesc tupleDescriptor, HeapTuple heapTuple);
static void PropagateNodeWideObjectsCommandList(List *nodeToSyncMetadataConnections,
List **nodeWideObjectCommandList);
static void PropagateNodeWideObjectsCommandList(MetadataSyncContext syncContext);
static WorkerNode * ModifiableWorkerNode(const char *nodeName, int32 nodePort);
static bool NodeIsLocal(WorkerNode *worker);
static void SetLockTimeoutLocally(int32 lock_cooldown);
@ -655,8 +654,7 @@ master_set_node_property(PG_FUNCTION_ARGS)
* for each citus table.
*/
static void
InterTableRelationshipCommandList(List *nodeToSyncMetadataConnections,
List **multipleTableIntegrationCommandList)
InterTableRelationshipCommandList(MetadataSyncContext syncContext)
{
List *citusTableIdList = CitusTableTypeIdList(ANY_CITUS_TABLE_TYPE);
List *propagatedTableList = NIL;
@ -680,32 +678,27 @@ InterTableRelationshipCommandList(List *nodeToSyncMetadataConnections,
List *commandListForRelation =
InterTableRelationshipOfRelationCommandList(relationId);
if (list_length(nodeToSyncMetadataConnections) != 0)
if (syncContext.syncImmediately)
{
SendCommandListToWorkerOutsideTransactionWithConnection(linitial(
nodeToSyncMetadataConnections),
syncContext.
nodeConnectionList),
commandListForRelation);
}
if (multipleTableIntegrationCommandList != NULL)
{
*multipleTableIntegrationCommandList = list_concat(
*multipleTableIntegrationCommandList,
commandListForRelation);
}
else
{
list_free_deep(commandListForRelation);
syncContext.ddlCommandList = list_concat(
syncContext.ddlCommandList,
commandListForRelation);
}
}
if (multipleTableIntegrationCommandList != NULL)
if (!syncContext.syncImmediately)
{
*multipleTableIntegrationCommandList = lcons(DISABLE_DDL_PROPAGATION,
*multipleTableIntegrationCommandList);
*multipleTableIntegrationCommandList = lappend(
*multipleTableIntegrationCommandList,
syncContext.ddlCommandList = lcons(DISABLE_DDL_PROPAGATION,
syncContext.ddlCommandList);
syncContext.ddlCommandList = lappend(
syncContext.ddlCommandList,
ENABLE_DDL_PROPAGATION);
}
}
@ -716,8 +709,7 @@ InterTableRelationshipCommandList(List *nodeToSyncMetadataConnections,
* (except pg_dist_node) metadata. We call them as table metadata.
*/
void
PgDistTableMetadataSyncCommandList(List *nodeToSyncMetadataConnections,
List **metadataSnapshotCommandList)
PgDistTableMetadataSyncCommandList(MetadataSyncContext syncContext)
{
List *distributedTableList = CitusTableList();
List *propagatedTableList = NIL;
@ -732,24 +724,27 @@ PgDistTableMetadataSyncCommandList(List *nodeToSyncMetadataConnections,
}
}
/* remove all dist table and object related metadata first */
*metadataSnapshotCommandList = lappend(*metadataSnapshotCommandList,
DELETE_ALL_PARTITIONS);
*metadataSnapshotCommandList = lappend(*metadataSnapshotCommandList,
DELETE_ALL_SHARDS);
*metadataSnapshotCommandList = lappend(*metadataSnapshotCommandList,
DELETE_ALL_PLACEMENTS);
*metadataSnapshotCommandList = lappend(*metadataSnapshotCommandList,
DELETE_ALL_DISTRIBUTED_OBJECTS);
*metadataSnapshotCommandList = lappend(*metadataSnapshotCommandList,
DELETE_ALL_COLOCATION);
if (list_length(nodeToSyncMetadataConnections) != 0)
List *pgDistTableMetadata = NIL;
/* remove all dist table and object related metadata first */
pgDistTableMetadata = lappend(pgDistTableMetadata,
DELETE_ALL_PARTITIONS);
pgDistTableMetadata = lappend(pgDistTableMetadata,
DELETE_ALL_SHARDS);
pgDistTableMetadata = lappend(pgDistTableMetadata,
DELETE_ALL_PLACEMENTS);
pgDistTableMetadata = lappend(pgDistTableMetadata,
DELETE_ALL_DISTRIBUTED_OBJECTS);
pgDistTableMetadata = lappend(pgDistTableMetadata,
DELETE_ALL_COLOCATION);
if (syncContext.syncImmediately)
{
SendCommandListToWorkerOutsideTransactionWithConnection(linitial(
nodeToSyncMetadataConnections),
*
metadataSnapshotCommandList);
syncContext.
nodeConnectionList),
pgDistTableMetadata);
}
/* create pg_dist_partition, pg_dist_shard and pg_dist_placement entries */
@ -758,38 +753,105 @@ PgDistTableMetadataSyncCommandList(List *nodeToSyncMetadataConnections,
List *tableMetadataCreateCommandList =
CitusTableMetadataCreateCommandList(cacheEntry->relationId);
SendCommandListToWorkerOutsideTransactionWithConnection(linitial(
nodeToSyncMetadataConnections),
tableMetadataCreateCommandList);
if (syncContext.syncImmediately)
{
SendCommandListToWorkerOutsideTransactionWithConnection(linitial(
syncContext.
nodeConnectionList),
tableMetadataCreateCommandList);
*metadataSnapshotCommandList = list_concat(*metadataSnapshotCommandList,
tableMetadataCreateCommandList);
list_free_deep(tableMetadataCreateCommandList);
}
else
{
pgDistTableMetadata = list_concat(pgDistTableMetadata,
tableMetadataCreateCommandList);
}
}
/* commands to insert pg_dist_colocation entries */
List *colocationGroupSyncCommandList = ColocationGroupCreateCommandList();
*metadataSnapshotCommandList = list_concat(*metadataSnapshotCommandList,
colocationGroupSyncCommandList);
if (syncContext.syncImmediately)
{
SendCommandListToWorkerOutsideTransactionWithConnection(linitial(
syncContext.
nodeConnectionList),
colocationGroupSyncCommandList);
list_free_deep(colocationGroupSyncCommandList);
}
else
{
pgDistTableMetadata = list_concat(pgDistTableMetadata,
colocationGroupSyncCommandList);
}
List *distributedObjectSyncCommandList = DistributedObjectMetadataSyncCommandList();
*metadataSnapshotCommandList = list_concat(*metadataSnapshotCommandList,
distributedObjectSyncCommandList);
if (syncContext.syncImmediately)
{
SendCommandListToWorkerOutsideTransactionWithConnection(linitial(
syncContext.
nodeConnectionList),
distributedObjectSyncCommandList);
*metadataSnapshotCommandList = lcons(DISABLE_DDL_PROPAGATION,
*metadataSnapshotCommandList);
*metadataSnapshotCommandList = lappend(*metadataSnapshotCommandList,
ENABLE_DDL_PROPAGATION);
list_free_deep(distributedObjectSyncCommandList);
}
else
{
pgDistTableMetadata = list_concat(pgDistTableMetadata,
distributedObjectSyncCommandList);
}
if (!syncContext.syncImmediately)
{
pgDistTableMetadata = lcons(DISABLE_DDL_PROPAGATION,
pgDistTableMetadata);
pgDistTableMetadata = lappend(pgDistTableMetadata,
ENABLE_DDL_PROPAGATION);
syncContext.ddlCommandList = list_concat(syncContext.ddlCommandList,
pgDistTableMetadata);
}
}
void
SyncMetadataCommands(MetadataSyncContext syncContext, List *commandList,
bool raiseErrors)
{
/* iterate over the commands and execute them in the same connection */
const char *commandString = NULL;
foreach_ptr(commandString, commandList)
{
MultiConnection *connection = NULL;
foreach_ptr(connection, syncContext.nodeConnectionList)
{
bool success = SendRemoteCommand(connection, commandString);
if (!success)
{
HandleRemoteTransactionConnectionError(connection, raiseErrors);
}
}
WaitForAllConnections(syncContext.nodeConnectionList, raiseErrors);
}
MultiConnection *connection = NULL;
foreach_ptr(connection, syncContext.nodeConnectionList)
{
ForgetResults(connection);
}
}
/*
* PropagateNodeWideObjectsCommandList is called during node activation to
* propagate any object that should be propagated for every node. These are
* generally not linked to any distributed object but change system wide behaviour.
*/
static void
PropagateNodeWideObjectsCommandList(List *nodeToSyncMetadataConnections,
List **nodeWideObjectCommandList)
PropagateNodeWideObjectsCommandList(MetadataSyncContext syncContext)
{
if (EnableAlterRoleSetPropagation)
{
@ -799,23 +861,23 @@ PropagateNodeWideObjectsCommandList(List *nodeToSyncMetadataConnections,
*/
List *alterRoleSetCommands = GenerateAlterRoleSetCommandForRole(InvalidOid);
if (nodeWideObjectCommandList != NULL)
if (!syncContext.syncImmediately)
{
*nodeWideObjectCommandList = list_concat(*nodeWideObjectCommandList,
alterRoleSetCommands);
/* if there are command wrap them in enable_ddl_propagation off */
*nodeWideObjectCommandList = lcons(DISABLE_DDL_PROPAGATION,
*nodeWideObjectCommandList);
*nodeWideObjectCommandList = lappend(*nodeWideObjectCommandList,
ENABLE_DDL_PROPAGATION);
}
syncContext.ddlCommandList =
lappend(syncContext.ddlCommandList, DISABLE_DDL_PROPAGATION);
if (list_length(nodeToSyncMetadataConnections) != 0)
syncContext.ddlCommandList =
list_concat(syncContext.ddlCommandList, alterRoleSetCommands);
syncContext.ddlCommandList =
lappend(syncContext.ddlCommandList, ENABLE_DDL_PROPAGATION);
}
else
{
SendCommandListToWorkerOutsideTransactionWithConnection(linitial(
nodeToSyncMetadataConnections),
alterRoleSetCommands);
bool raiseErrors = true;
SyncMetadataCommands(syncContext, alterRoleSetCommands, raiseErrors);
}
}
}
@ -837,63 +899,51 @@ PropagateNodeWideObjectsCommandList(List *nodeToSyncMetadataConnections,
* requires it.
*/
void
SyncDistributedObjectsCommandList(List *nodeToSyncMetadataConnections, List **commandList)
SyncDistributedObjectsCommandList(MetadataSyncContext syncContext)
{
/*
* Propagate node wide objects. It includes only roles for now.
*/
PropagateNodeWideObjectsCommandList(nodeToSyncMetadataConnections, commandList);
PropagateNodeWideObjectsCommandList(syncContext);
/*
* Detach partitions, break dependencies between sequences and table then
* remove shell tables first.
*/
DetachPartitionCommandList(nodeToSyncMetadataConnections, commandList);
DetachPartitionCommandList(syncContext);
if (list_length(nodeToSyncMetadataConnections) != 0)
if (syncContext.syncImmediately)
{
SendCommandListToWorkerOutsideTransactionWithConnection(linitial(
nodeToSyncMetadataConnections),
syncContext.
nodeConnectionList),
list_make1(
BREAK_CITUS_TABLE_SEQUENCE_DEPENDENCY_COMMAND));
SendCommandListToWorkerOutsideTransactionWithConnection(linitial(
nodeToSyncMetadataConnections),
syncContext.
nodeConnectionList),
list_make1(
REMOVE_ALL_SHELL_TABLES_COMMAND));
}
if (commandList != NULL)
else
{
*commandList = lappend(*commandList,
BREAK_CITUS_TABLE_SEQUENCE_DEPENDENCY_COMMAND);
*commandList = lappend(*commandList, REMOVE_ALL_SHELL_TABLES_COMMAND);
syncContext.ddlCommandList = lappend(syncContext.ddlCommandList,
BREAK_CITUS_TABLE_SEQUENCE_DEPENDENCY_COMMAND);
syncContext.ddlCommandList = lappend(syncContext.ddlCommandList,
REMOVE_ALL_SHELL_TABLES_COMMAND);
}
/*
* Replicate all objects of the pg_dist_object to the remote node.
*/
List *replicateAllObjectsToNodeCommandList = NIL;
ReplicateAllObjectsToNodeCommandList(nodeToSyncMetadataConnections,
&replicateAllObjectsToNodeCommandList);
if (commandList != NULL)
{
*commandList = list_concat(*commandList, replicateAllObjectsToNodeCommandList);
}
ReplicateAllObjectsToNodeCommandList(syncContext);
/*
* After creating each table, handle the inter table relationship between
* those tables.
*/
List *interTableRelationshipCommandList = NIL;
InterTableRelationshipCommandList(nodeToSyncMetadataConnections,
&interTableRelationshipCommandList);
if (commandList != NULL)
{
*commandList = list_concat(*commandList, interTableRelationshipCommandList);
}
InterTableRelationshipCommandList(syncContext);
}
@ -907,13 +957,8 @@ SyncDistributedObjectsCommandList(List *nodeToSyncMetadataConnections, List **co
* since all the dependencies should be present in the coordinator already.
*/
static void
SyncDistributedObjectsToNodeList(List *nodeToSyncMetadataConnections)
SyncDistributedObjectsToNodeList(MetadataSyncContext syncContext)
{
if (nodeToSyncMetadataConnections == NIL)
{
return;
}
EnsureSequentialModeMetadataOperations();
Assert(ShouldPropagate());
@ -921,7 +966,7 @@ SyncDistributedObjectsToNodeList(List *nodeToSyncMetadataConnections)
/* send commands to new workers, the current user should be a superuser */
Assert(superuser());
SyncDistributedObjectsCommandList(nodeToSyncMetadataConnections, NULL);
SyncDistributedObjectsCommandList(syncContext);
}
@ -929,18 +974,14 @@ SyncDistributedObjectsToNodeList(List *nodeToSyncMetadataConnections)
* UpdateLocalGroupIdOnNode updates local group id on node.
*/
static void
UpdateLocalGroupIdOnNode(WorkerNode *workerNode)
UpdateLocalGroupIdOnNode(MultiConnection *connection, WorkerNode *workerNode)
{
if (NodeIsPrimary(workerNode) && !NodeIsCoordinator(workerNode))
{
List *commandList = list_make1(LocalGroupIdUpdateCommand(workerNode->groupId));
/* send commands to new workers, the current user should be a superuser */
Assert(superuser());
SendMetadataCommandListToWorkerListInCoordinatedTransaction(
list_make1(workerNode),
CurrentUserName(),
commandList);
SendCommandListToWorkerOutsideTransactionWithConnection(connection, commandList);
}
}
@ -951,14 +992,12 @@ UpdateLocalGroupIdOnNode(WorkerNode *workerNode)
*
*/
static void
SyncPgDistTableMetadataToNodeList(List *nodeToSyncMetadataConnections)
SyncPgDistTableMetadataToNodeList(MetadataSyncContext syncContext)
{
/* send commands to new workers, the current user should be a superuser */
Assert(superuser());
List *syncPgDistMetadataCommandList = NIL;
PgDistTableMetadataSyncCommandList(nodeToSyncMetadataConnections,
&syncPgDistMetadataCommandList);
PgDistTableMetadataSyncCommandList(syncContext);
}
@ -1180,7 +1219,11 @@ ActivateNodeList(List *nodeList)
List *nodeToSyncMetadata = NIL;
List *nodeToSyncMetadataConnections = NIL;
MetadataSyncContext syncContext;
/* we don't need to collect any the ddl commands */
syncContext.syncImmediately = true;
syncContext.nodeConnectionList = NIL;
WorkerNode *node = NULL;
foreach_ptr(node, nodeList)
@ -1232,11 +1275,6 @@ ActivateNodeList(List *nodeList)
SetWorkerColumn(workerNode, Anum_pg_dist_node_metadatasynced,
BoolGetDatum(true));
/*
* Update local group id first, as object dependency logic requires to have
* updated local group id.
*/
UpdateLocalGroupIdOnNode(workerNode);
nodeToSyncMetadata = lappend(nodeToSyncMetadata, workerNode);
@ -1247,13 +1285,19 @@ ActivateNodeList(List *nodeList)
ClaimConnectionExclusively(connection);
nodeToSyncMetadataConnections =
lappend(nodeToSyncMetadataConnections, connection);
syncContext.nodeConnectionList =
lappend(syncContext.nodeConnectionList, connection);
SendCommandListToWorkerOutsideTransactionWithConnection(linitial(
nodeToSyncMetadataConnections),
SendCommandListToWorkerOutsideTransactionWithConnection(connection,
list_make1(
DISABLE_DDL_PROPAGATION));
/*
* Update local group id first, as object dependency logic requires to have
* updated local group id.
*/
UpdateLocalGroupIdOnNode(connection, workerNode);
}
}
@ -1262,24 +1306,22 @@ ActivateNodeList(List *nodeList)
* replicating reference tables to the remote node, as reference tables may
* need such objects.
*/
SyncDistributedObjectsToNodeList(nodeToSyncMetadataConnections);
SyncDistributedObjectsToNodeList(syncContext);
/*
* Sync node metadata. We must sync node metadata before syncing table
* related pg_dist_xxx metadata. Since table related metadata requires
* to have right pg_dist_node entries.
*/
foreach_ptr(node, nodeToSyncMetadata)
{
SyncNodeMetadataToNode(node->workerName, node->workerPort);
}
SyncNodeMetadataToNode(node->workerName, node->workerPort);
/*
* As the last step, sync the table related metadata to the remote node.
* We must handle it as the last step because of limitations shared with
* above comments.
*/
SyncPgDistTableMetadataToNodeList(nodeToSyncMetadataConnections);
SyncPgDistTableMetadataToNodeList(syncContext);
foreach_ptr(node, nodeList)
{

View File

@ -49,17 +49,21 @@ activate_node_snapshot(PG_FUNCTION_ARGS)
*/
WorkerNode *dummyWorkerNode = GetFirstPrimaryWorkerNode();
List *updateLocalGroupCommand =
list_make1(LocalGroupIdUpdateCommand(dummyWorkerNode->groupId));
List *syncDistObjCommands = NIL;
SyncDistributedObjectsCommandList(NIL, &syncDistObjCommands);
MetadataSyncContext syncContext = { false, NIL, NIL };
SyncDistributedObjectsCommandList(syncContext);
List *syncDistObjCommands = syncContext.ddlCommandList;
List *dropSnapshotCommands = NodeMetadataDropCommands();
List *createSnapshotCommands = NodeMetadataCreateCommands();
List *pgDistTableMetadataSyncCommands = NIL;
PgDistTableMetadataSyncCommandList(NIL, &pgDistTableMetadataSyncCommands);
syncContext.ddlCommandList = NIL;
PgDistTableMetadataSyncCommandList(syncContext);
List *pgDistTableMetadataSyncCommands = syncContext.ddlCommandList;
List *activateNodeCommandList = NIL;
int activateNodeCommandIndex = 0;

View File

@ -76,6 +76,7 @@ typedef struct DDLJob
List *taskList; /* worker DDL tasks to execute */
} DDLJob;
extern ProcessUtility_hook_type PrevProcessUtility;
extern void multi_ProcessUtility(PlannedStmt *pstmt, const char *queryString,

View File

@ -48,7 +48,6 @@ typedef struct SequenceInfo
/* Functions declarations for metadata syncing */
extern void SyncNodeMetadataToNode(const char *nodeNameString, int32 nodePort);
extern void SyncCitusTableMetadata(Oid relationId);
extern void EnsureSequentialModeMetadataOperations(void);
extern bool ClusterHasKnownMetadataWorkers(void);
@ -90,8 +89,7 @@ extern char * PlacementUpsertCommand(uint64 shardId, uint64 placementId, int sha
extern TableDDLCommand * TruncateTriggerCreateCommand(Oid relationId);
extern void CreateInterTableRelationshipOfRelationOnWorkers(Oid relationId);
extern List * InterTableRelationshipOfRelationCommandList(Oid relationId);
extern void DetachPartitionCommandList(List *nodeToSyncMetadataConnections,
List **detachPartitionCommandList);
extern void DetachPartitionCommandList(MetadataSyncContext syncContext);
extern void SyncNodeMetadataToNodes(void);
extern BackgroundWorkerHandle * SpawnSyncNodeMetadataToNodes(Oid database, Oid owner);
extern void SyncNodeMetadataToNodesMain(Datum main_arg);

View File

@ -264,6 +264,7 @@ typedef struct BackgroundTask
} __nullable_storage;
} BackgroundTask;
#define SET_NULLABLE_FIELD(ptr, field, value) \
(ptr)->__nullable_storage.field = (value); \
(ptr)->field = &((ptr)->__nullable_storage.field)
@ -343,8 +344,7 @@ extern List * GetAllDependencyCreateDDLCommands(const List *dependencies);
extern bool ShouldPropagate(void);
extern bool ShouldPropagateCreateInCoordinatedTransction(void);
extern bool ShouldPropagateAnyObject(List *addresses);
extern void ReplicateAllObjectsToNodeCommandList(List *nodeToSyncMetadataConnections,
List **ddlCommands);
extern void ReplicateAllObjectsToNodeCommandList(MetadataSyncContext syncContext);
/* Remaining metadata utility functions */
extern Oid TableOwnerOid(Oid relationId);

View File

@ -16,6 +16,8 @@
#include "postgres.h"
#include "distributed/metadata_utility.h"
#include "distributed/metadata_sync_context.h"
#include "storage/lmgr.h"
#include "storage/lockdefs.h"
#include "nodes/pg_list.h"
@ -105,11 +107,11 @@ extern WorkerNode * SetWorkerColumnLocalOnly(WorkerNode *workerNode, int columnI
Datum value);
extern uint32 CountPrimariesWithMetadata(void);
extern WorkerNode * GetFirstPrimaryWorkerNode(void);
extern void SyncDistributedObjectsCommandList(List *nodeToSyncMetadataConnections,
List **commandList);
extern void PgDistTableMetadataSyncCommandList(List *nodeToSyncMetadataConnections,
List **metadataSnapshotCommandList);
extern void SyncDistributedObjectsCommandList(MetadataSyncContext syncContext);
extern void PgDistTableMetadataSyncCommandList(MetadataSyncContext syncContext);
extern void
SyncMetadataCommands(MetadataSyncContext syncContext, List *commandList,
bool raiseErrors);
/* Function declarations for worker node utilities */
extern int CompareWorkerNodes(const void *leftElement, const void *rightElement);
extern uint32 WorkerNodeHashCode(const void *key, Size keySize);