Separate command list generation and execution

velioglu/wo_seq_test_1
Burak Velioglu 2022-01-16 23:49:31 +03:00
parent 381871890f
commit aedd09ffdf
No known key found for this signature in database
GPG Key ID: F6827E620F6549C6
3 changed files with 106 additions and 106 deletions

View File

@ -368,12 +368,12 @@ GetCitusTableDDLCommandList(Oid relationId)
/* /*
* ReplicateAllDependenciesToNode replicate all previously marked objects to a worker * ReplicateAllDependenciesToNodeCommandList returns commands to replicate all
* node. The function also sets clusterHasDistributedFunction if there are any * previously marked objects to a worker node. The function also sets
* distributed functions. * clusterHasDistributedFunction if there are any distributed functions.
*/ */
void List *
ReplicateAllDependenciesToNode(const char *nodeName, int nodePort) ReplicateAllDependenciesToNodeCommandList(const char *nodeName, int nodePort)
{ {
List *ddlCommands = NIL; List *ddlCommands = NIL;
@ -412,22 +412,12 @@ ReplicateAllDependenciesToNode(const char *nodeName, int nodePort)
ddlCommands = list_concat(ddlCommands, ddlCommands = list_concat(ddlCommands,
GetDependencyCreateDDLCommands(dependency)); GetDependencyCreateDDLCommands(dependency));
} }
if (list_length(ddlCommands) <= 0)
{
/* no commands to replicate dependencies to the new worker */
return;
}
/* since we are executing ddl commands lets disable propagation, primarily for mx */ /* since we are executing ddl commands lets disable propagation, primarily for mx */
ddlCommands = lcons(DISABLE_DDL_PROPAGATION, ddlCommands); ddlCommands = lcons(DISABLE_DDL_PROPAGATION, ddlCommands);
ddlCommands = lappend(ddlCommands, ENABLE_DDL_PROPAGATION); ddlCommands = lappend(ddlCommands, ENABLE_DDL_PROPAGATION);
/* send commands to new workers, the current user should a superuser */ return ddlCommands;
Assert(superuser());
SendMetadataCommandListToWorkerInCoordinatedTransaction(nodeName,
nodePort,
CurrentUserName(),
ddlCommands);
} }

View File

@ -107,14 +107,16 @@ static void InsertPlaceholderCoordinatorRecord(void);
static void InsertNodeRow(int nodeid, char *nodename, int32 nodeport, NodeMetadata static void InsertNodeRow(int nodeid, char *nodename, int32 nodeport, NodeMetadata
*nodeMetadata); *nodeMetadata);
static void DeleteNodeRow(char *nodename, int32 nodeport); static void DeleteNodeRow(char *nodename, int32 nodeport);
static void SetUpObjectMetadata(WorkerNode *workerNode); static List * ResyncMetadataCommandList();
static void ClearDistributedObjectsFromNode(WorkerNode *workerNode); static List * MetadataSetupCommandList();
static void ClearDistributedTablesFromNode(WorkerNode *workerNode); static List * ClearMetadataCommandList();
static void UpdatePgDistLocalGroupOnNode(WorkerNode *workerNode); static List * ClearShellTablesCommandList();
static List * RecreateDistributedTablesWithDependenciesCommandList(
WorkerNode *workerNode);
static void SetUpDistributedTableWithDependencies(WorkerNode *workerNode); static void SetUpDistributedTableWithDependencies(WorkerNode *workerNode);
static void SetUpMultipleDistributedTableIntegrations(WorkerNode *workerNode); static List * MultipleDistributedTableIntegrationsCommandList();
static WorkerNode * TupleToWorkerNode(TupleDesc tupleDescriptor, HeapTuple heapTuple); static WorkerNode * TupleToWorkerNode(TupleDesc tupleDescriptor, HeapTuple heapTuple);
static void PropagateNodeWideObjects(WorkerNode *newWorkerNode); static List * PropagateNodeWideObjectsCommandList();
static WorkerNode * ModifiableWorkerNode(const char *nodeName, int32 nodePort); static WorkerNode * ModifiableWorkerNode(const char *nodeName, int32 nodePort);
static bool NodeIsLocal(WorkerNode *worker); static bool NodeIsLocal(WorkerNode *worker);
static void SetLockTimeoutLocally(int32 lock_cooldown); static void SetLockTimeoutLocally(int32 lock_cooldown);
@ -580,16 +582,15 @@ master_set_node_property(PG_FUNCTION_ARGS)
/* /*
* SetUpMultipleDistributedTableIntegrations set up the multiple integrations * MultipleDistributedTableIntegrationsCommandList returns the command list to
* including * set up the multiple integrations including
* *
* (i) Foreign keys * (i) Foreign keys
* (ii) Partionining hierarchy * (ii) Partionining hierarchy
* *
* on the given worker node.
*/ */
static void static List *
SetUpMultipleDistributedTableIntegrations(WorkerNode *workerNode) MultipleDistributedTableIntegrationsCommandList()
{ {
List *distributedTableList = CitusTableList(); List *distributedTableList = CitusTableList();
List *propagatedTableList = NIL; List *propagatedTableList = NIL;
@ -650,21 +651,16 @@ SetUpMultipleDistributedTableIntegrations(WorkerNode *workerNode)
multipleTableIntegrationCommandList = lappend(multipleTableIntegrationCommandList, multipleTableIntegrationCommandList = lappend(multipleTableIntegrationCommandList,
ENABLE_DDL_PROPAGATION); ENABLE_DDL_PROPAGATION);
/* send commands to new workers, the current user should be a superuser */ return multipleTableIntegrationCommandList;
Assert(superuser());
SendMetadataCommandListToWorkerInCoordinatedTransaction(workerNode->workerName,
workerNode->workerPort,
CurrentUserName(),
multipleTableIntegrationCommandList);
} }
/* /*
* SetUpObjectMetadata sets up the metadata depending on the distributed object * MetadataSetupCommandList get the command list to set up the metadata
* on the given node. * depending on the distributed object on the given node.
*/ */
static void static List *
SetUpObjectMetadata(WorkerNode *workerNode) MetadataSetupCommandList()
{ {
List *distributedTableList = CitusTableList(); List *distributedTableList = CitusTableList();
List *propagatedTableList = NIL; List *propagatedTableList = NIL;
@ -721,71 +717,81 @@ SetUpObjectMetadata(WorkerNode *workerNode)
metadataSnapshotCommandList = lappend(metadataSnapshotCommandList, metadataSnapshotCommandList = lappend(metadataSnapshotCommandList,
ENABLE_DDL_PROPAGATION); ENABLE_DDL_PROPAGATION);
/* send commands to new workers, the current user should be a superuser */ return metadataSnapshotCommandList;
Assert(superuser());
SendMetadataCommandListToWorkerInCoordinatedTransaction(workerNode->workerName,
workerNode->workerPort,
CurrentUserName(),
metadataSnapshotCommandList);
} }
/* /*
* UpdatePgDistLocalGroupOnNode updates the pg_dist_local_group on the given node * RecreateDistributedTablesWithDependenciesCommandList return command list to recreate
* distributed tables with command list.
*/ */
static void static List *
UpdatePgDistLocalGroupOnNode(WorkerNode *workerNode) RecreateDistributedTablesWithDependenciesCommandList(WorkerNode *workerNode)
{ {
/* generate and add the local group id's update query */ List *commandList = NIL;
char *localGroupIdUpdateCommand = LocalGroupIdUpdateCommand(workerNode->groupId);
List *localGroupIdUpdateCommandList = list_make1(localGroupIdUpdateCommand); commandList = list_concat(commandList, DetachPartitionCommandList());
commandList = list_concat(commandList, ClearShellTablesCommandList());
commandList = list_concat(commandList, PropagateNodeWideObjectsCommandList());
commandList = list_concat(commandList, list_make1(LocalGroupIdUpdateCommand(
workerNode->groupId)));
commandList = list_concat(commandList, ReplicateAllDependenciesToNodeCommandList(
workerNode->workerName, workerNode->workerPort));
commandList = list_concat(commandList,
MultipleDistributedTableIntegrationsCommandList());
/* send commands to new workers, the current user should be a superuser */ return commandList;
Assert(superuser());
SendMetadataCommandListToWorkerInCoordinatedTransaction(workerNode->workerName,
workerNode->workerPort,
CurrentUserName(),
localGroupIdUpdateCommandList);
} }
/* /*
* ClearDistributedTablesFromNode clear (shell) distributed tables from the given node. * ClearShellTablesCommandList returns the command list to clear (shell) distributed
* tables from the given node.
*/ */
static void static List *
ClearDistributedTablesFromNode(WorkerNode *workerNode) ClearShellTablesCommandList()
{ {
List *clearDistributedTablesCommandList = NIL; List *clearShellTablesCommandList = NIL;
List *detachPartitionCommandList = DetachPartitionCommandList(); clearShellTablesCommandList = lappend(clearShellTablesCommandList,
REMOVE_ALL_CLUSTERED_TABLES_ONLY_COMMAND);
clearDistributedTablesCommandList = list_concat(clearDistributedTablesCommandList, clearShellTablesCommandList = list_concat(list_make1(DISABLE_DDL_PROPAGATION),
detachPartitionCommandList); clearShellTablesCommandList);
clearShellTablesCommandList = list_concat(clearShellTablesCommandList,
list_make1(ENABLE_DDL_PROPAGATION));
clearDistributedTablesCommandList = lappend(clearDistributedTablesCommandList, return clearShellTablesCommandList;
REMOVE_ALL_CLUSTERED_TABLES_ONLY_COMMAND);
clearDistributedTablesCommandList = list_concat(list_make1(DISABLE_DDL_PROPAGATION),
clearDistributedTablesCommandList);
clearDistributedTablesCommandList = list_concat(clearDistributedTablesCommandList,
list_make1(
ENABLE_DDL_PROPAGATION));
/* send commands to new workers, the current user should be a superuser */
Assert(superuser());
SendMetadataCommandListToWorkerInCoordinatedTransaction(workerNode->workerName,
workerNode->workerPort,
CurrentUserName(),
clearDistributedTablesCommandList);
} }
/* /*
* ClearDistributedObjectsFromNode clears all the distributed objects, metadata and partition hierarchy from the given node. * ResyncMetadataCommandList returns the command list to resync all the
* distributed table related metadata.
*/ */
static void static List *
ClearDistributedObjectsFromNode(WorkerNode *workerNode) ResyncMetadataCommandList()
{
List *resyncMetadataCommandList = NIL;
List *clearMetadataCommandList = ClearMetadataCommandList();
resyncMetadataCommandList = list_concat(resyncMetadataCommandList,
clearMetadataCommandList);
List *setupMetadataCommandList = MetadataSetupCommandList();
resyncMetadataCommandList = list_concat(resyncMetadataCommandList,
setupMetadataCommandList);
return resyncMetadataCommandList;
}
/*
* ClearMetadataCommandList returns the command list to clear all the
* distributed objects and metadata from the given node.
*/
static List *
ClearMetadataCommandList()
{ {
List *clearDistTableInfoCommandList = NIL; List *clearDistTableInfoCommandList = NIL;
@ -806,11 +812,7 @@ ClearDistributedObjectsFromNode(WorkerNode *workerNode)
clearDistTableInfoCommandList = list_concat(clearDistTableInfoCommandList, list_make1( clearDistTableInfoCommandList = list_concat(clearDistTableInfoCommandList, list_make1(
ENABLE_DDL_PROPAGATION)); ENABLE_DDL_PROPAGATION));
char *currentUser = CurrentUserName(); return clearDistTableInfoCommandList;
SendMetadataCommandListToWorkerInCoordinatedTransaction(workerNode->workerName,
workerNode->workerPort,
currentUser,
clearDistTableInfoCommandList);
} }
@ -835,12 +837,17 @@ SetUpDistributedTableWithDependencies(WorkerNode *newWorkerNode)
Assert(ShouldPropagate()); Assert(ShouldPropagate());
if (!NodeIsCoordinator(newWorkerNode)) if (!NodeIsCoordinator(newWorkerNode))
{ {
ClearDistributedTablesFromNode(newWorkerNode); List *commandList = RecreateDistributedTablesWithDependenciesCommandList(
PropagateNodeWideObjects(newWorkerNode); newWorkerNode);
UpdatePgDistLocalGroupOnNode(newWorkerNode);
ReplicateAllDependenciesToNode(newWorkerNode->workerName, /* send commands to new workers, the current user should be a superuser */
newWorkerNode->workerPort); Assert(superuser());
SetUpMultipleDistributedTableIntegrations(newWorkerNode); SendMetadataCommandListToWorkerInCoordinatedTransaction(
newWorkerNode->workerName,
newWorkerNode->
workerPort,
CurrentUserName(),
commandList);
} }
if (ReplicateReferenceTablesOnActivate) if (ReplicateReferenceTablesOnActivate)
@ -853,12 +860,12 @@ SetUpDistributedTableWithDependencies(WorkerNode *newWorkerNode)
/* /*
* PropagateNodeWideObjects is called during node activation to propagate any object that * PropagateNodeWideObjectsCommandList is called during node activation to
* should be propagated for every node. These are generally not linked to any distributed * propagate any object that should be propagated for every node. These are
* object but change system wide behaviour. * generally not linked to any distributed object but change system wide behaviour.
*/ */
static void static List *
PropagateNodeWideObjects(WorkerNode *newWorkerNode) PropagateNodeWideObjectsCommandList()
{ {
/* collect all commands */ /* collect all commands */
List *ddlCommands = NIL; List *ddlCommands = NIL;
@ -878,14 +885,9 @@ PropagateNodeWideObjects(WorkerNode *newWorkerNode)
/* if there are command wrap them in enable_ddl_propagation off */ /* if there are command wrap them in enable_ddl_propagation off */
ddlCommands = lcons(DISABLE_DDL_PROPAGATION, ddlCommands); ddlCommands = lcons(DISABLE_DDL_PROPAGATION, ddlCommands);
ddlCommands = lappend(ddlCommands, ENABLE_DDL_PROPAGATION); ddlCommands = lappend(ddlCommands, ENABLE_DDL_PROPAGATION);
/* send commands to new workers, the current user should be a superuser */
Assert(superuser());
SendMetadataCommandListToWorkerInCoordinatedTransaction(newWorkerNode->workerName,
newWorkerNode->workerPort,
CurrentUserName(),
ddlCommands);
} }
return ddlCommands;
} }
@ -1163,8 +1165,15 @@ ActivateNode(char *nodeName, int nodePort)
if (!NodeIsCoordinator(workerNode)) if (!NodeIsCoordinator(workerNode))
{ {
ClearDistributedObjectsFromNode(workerNode); List *metadataUpdateCommandList = ResyncMetadataCommandList();
SetUpObjectMetadata(workerNode);
/* send commands to the new worker, the current user should be a superuser */
Assert(superuser());
SendMetadataCommandListToWorkerInCoordinatedTransaction(
workerNode->workerName,
workerNode->workerPort,
CurrentUserName(),
metadataUpdateCommandList);
} }
} }

View File

@ -249,7 +249,8 @@ extern void EnsureDependenciesExistOnAllNodes(const ObjectAddress *target);
extern List * GetDistributableDependenciesForObject(const ObjectAddress *target); extern List * GetDistributableDependenciesForObject(const ObjectAddress *target);
extern bool ShouldPropagate(void); extern bool ShouldPropagate(void);
extern bool ShouldPropagateObject(const ObjectAddress *address); extern bool ShouldPropagateObject(const ObjectAddress *address);
extern void ReplicateAllDependenciesToNode(const char *nodeName, int nodePort); extern List * ReplicateAllDependenciesToNodeCommandList(const char *nodeName, int
nodePort);
/* Remaining metadata utility functions */ /* Remaining metadata utility functions */
extern char * TableOwner(Oid relationId); extern char * TableOwner(Oid relationId);