address feedback

pull/7278/head
Onur Tirtir 2023-11-10 12:36:59 +03:00
parent ab7bf640b7
commit 78be00b1b3
9 changed files with 64 additions and 64 deletions

View File

@ -47,7 +47,7 @@ static char * DropTableIfExistsCommand(Oid relationId);
* Note; only the actual objects are created via a separate session, the records to * Note; only the actual objects are created via a separate session, the records to
* pg_dist_object are created in this session. As a side effect the objects could be * pg_dist_object are created in this session. As a side effect the objects could be
* created on the nodes without a catalog entry. Updates to the objects on local node * created on the nodes without a catalog entry. Updates to the objects on local node
* are not propagated to the other nodes until the record is visible on local node. * are not propagated to the remote nodes until the record is visible on local node.
* *
* This is solved by creating the dependencies in an idempotent manner, either via * This is solved by creating the dependencies in an idempotent manner, either via
* postgres native CREATE IF NOT EXISTS, or citus helper functions. * postgres native CREATE IF NOT EXISTS, or citus helper functions.
@ -95,7 +95,7 @@ EnsureDependenciesExistOnAllNodes(const ObjectAddress *target)
* either get it now, or get it in citus_add_node after this transaction finishes and * either get it now, or get it in citus_add_node after this transaction finishes and
* the pg_dist_object record becomes visible. * the pg_dist_object record becomes visible.
*/ */
List *otherNodes = ActivePrimaryOtherNodesList(RowShareLock); List *remoteNodeList = ActivePrimaryRemoteNodeList(RowShareLock);
/* /*
* Lock dependent objects explicitly to make sure same DDL command won't be sent * Lock dependent objects explicitly to make sure same DDL command won't be sent
@ -127,12 +127,12 @@ EnsureDependenciesExistOnAllNodes(const ObjectAddress *target)
*/ */
if (HasAnyDependencyInPropagatedObjects(target)) if (HasAnyDependencyInPropagatedObjects(target))
{ {
SendCommandListToOtherNodesWithMetadata(ddlCommands); SendCommandListToRemoteNodesWithMetadata(ddlCommands);
} }
else else
{ {
WorkerNode *workerNode = NULL; WorkerNode *workerNode = NULL;
foreach_ptr(workerNode, otherNodes) foreach_ptr(workerNode, remoteNodeList)
{ {
const char *nodeName = workerNode->workerName; const char *nodeName = workerNode->workerName;
uint32 nodePort = workerNode->workerPort; uint32 nodePort = workerNode->workerPort;
@ -144,8 +144,8 @@ EnsureDependenciesExistOnAllNodes(const ObjectAddress *target)
} }
/* /*
* We do this after creating the objects on other nodes, we make sure * We do this after creating the objects on remote nodes, we make sure
* that objects have been created on other nodes before marking them * that objects have been created on remote nodes before marking them
* distributed, so MarkObjectDistributed wouldn't fail. * distributed, so MarkObjectDistributed wouldn't fail.
*/ */
foreach_ptr(dependency, dependenciesWithCommands) foreach_ptr(dependency, dependenciesWithCommands)

View File

@ -185,7 +185,7 @@ PostprocessAlterRoleStmt(Node *node, const char *queryString)
(void *) CreateAlterRoleIfExistsCommand(stmt), (void *) CreateAlterRoleIfExistsCommand(stmt),
ENABLE_DDL_PROPAGATION); ENABLE_DDL_PROPAGATION);
return NodeDDLTaskList(OTHER_NODES, commands); return NodeDDLTaskList(REMOTE_NODES, commands);
} }
@ -240,7 +240,7 @@ PreprocessAlterRoleSetStmt(Node *node, const char *queryString,
(void *) sql, (void *) sql,
ENABLE_DDL_PROPAGATION); ENABLE_DDL_PROPAGATION);
return NodeDDLTaskList(OTHER_NODES, commandList); return NodeDDLTaskList(REMOTE_NODES, commandList);
} }
@ -946,7 +946,7 @@ PreprocessCreateRoleStmt(Node *node, const char *queryString,
commands = lappend(commands, ENABLE_DDL_PROPAGATION); commands = lappend(commands, ENABLE_DDL_PROPAGATION);
return NodeDDLTaskList(OTHER_NODES, commands); return NodeDDLTaskList(REMOTE_NODES, commands);
} }
@ -1055,7 +1055,7 @@ PreprocessDropRoleStmt(Node *node, const char *queryString,
sql, sql,
ENABLE_DDL_PROPAGATION); ENABLE_DDL_PROPAGATION);
return NodeDDLTaskList(OTHER_NODES, commands); return NodeDDLTaskList(REMOTE_NODES, commands);
} }
@ -1172,7 +1172,7 @@ PreprocessGrantRoleStmt(Node *node, const char *queryString,
sql, sql,
ENABLE_DDL_PROPAGATION); ENABLE_DDL_PROPAGATION);
return NodeDDLTaskList(OTHER_NODES, commands); return NodeDDLTaskList(REMOTE_NODES, commands);
} }
@ -1345,7 +1345,7 @@ PreprocessAlterRoleRenameStmt(Node *node, const char *queryString,
(void *) sql, (void *) sql,
ENABLE_DDL_PROPAGATION); ENABLE_DDL_PROPAGATION);
return NodeDDLTaskList(OTHER_NODES, commands); return NodeDDLTaskList(REMOTE_NODES, commands);
} }

View File

@ -1107,7 +1107,7 @@ IsDropSchemaOrDB(Node *parsetree)
* CoordinatedTransactionCallback function. * CoordinatedTransactionCallback function.
* *
* The function errors out if the DDL is on a partitioned table which has replication * The function errors out if the DDL is on a partitioned table which has replication
* factor > 1 or if the the coordinator is not added into metadata and we're on a * factor > 1, or if the the coordinator is not added into metadata and we're on a
* worker node because we want to make sure that distributed DDL jobs are executed * worker node because we want to make sure that distributed DDL jobs are executed
* on the coordinator node too. See EnsurePropagationToCoordinator() for more details. * on the coordinator node too. See EnsurePropagationToCoordinator() for more details.
*/ */
@ -1140,23 +1140,23 @@ ExecuteDistributedDDLJob(DDLJob *ddlJob)
{ {
if (shouldSyncMetadata) if (shouldSyncMetadata)
{ {
SendCommandToOtherNodesWithMetadata(DISABLE_DDL_PROPAGATION); SendCommandToRemoteNodesWithMetadata(DISABLE_DDL_PROPAGATION);
char *currentSearchPath = CurrentSearchPath(); char *currentSearchPath = CurrentSearchPath();
/* /*
* Given that we're relaying the query to the other nodes directly, * Given that we're relaying the query to the remote nodes directly,
* we should set the search path exactly the same when necessary. * we should set the search path exactly the same when necessary.
*/ */
if (currentSearchPath != NULL) if (currentSearchPath != NULL)
{ {
SendCommandToOtherNodesWithMetadata( SendCommandToRemoteNodesWithMetadata(
psprintf("SET LOCAL search_path TO %s;", currentSearchPath)); psprintf("SET LOCAL search_path TO %s;", currentSearchPath));
} }
if (ddlJob->metadataSyncCommand != NULL) if (ddlJob->metadataSyncCommand != NULL)
{ {
SendCommandToOtherNodesWithMetadata( SendCommandToRemoteNodesWithMetadata(
(char *) ddlJob->metadataSyncCommand); (char *) ddlJob->metadataSyncCommand);
} }
} }
@ -1236,7 +1236,7 @@ ExecuteDistributedDDLJob(DDLJob *ddlJob)
char *currentSearchPath = CurrentSearchPath(); char *currentSearchPath = CurrentSearchPath();
/* /*
* Given that we're relaying the query to the other nodes directly, * Given that we're relaying the query to the remote nodes directly,
* we should set the search path exactly the same when necessary. * we should set the search path exactly the same when necessary.
*/ */
if (currentSearchPath != NULL) if (currentSearchPath != NULL)
@ -1248,7 +1248,7 @@ ExecuteDistributedDDLJob(DDLJob *ddlJob)
commandList = lappend(commandList, (char *) ddlJob->metadataSyncCommand); commandList = lappend(commandList, (char *) ddlJob->metadataSyncCommand);
SendBareCommandListToOtherMetadataNodes(commandList); SendBareCommandListToRemoteMetadataNodes(commandList);
} }
} }
PG_CATCH(); PG_CATCH();

View File

@ -302,8 +302,8 @@ EnsureConnectionPossibilityForRemotePrimaryNodes(void)
* seem to cause any problems as none of the placements that we are * seem to cause any problems as none of the placements that we are
* going to access would be on the new node. * going to access would be on the new node.
*/ */
List *otherNodes = ActivePrimaryOtherNodesList(NoLock); List *remoteNodeList = ActivePrimaryRemoteNodeList(NoLock);
EnsureConnectionPossibilityForNodeList(otherNodes); EnsureConnectionPossibilityForNodeList(remoteNodeList);
} }

View File

@ -149,7 +149,7 @@ ObjectExists(const ObjectAddress *address)
/* /*
* MarkObjectDistributed marks an object as a distributed object. Marking is done * MarkObjectDistributed marks an object as a distributed object. Marking is done
* by adding appropriate entries to citus.pg_dist_object and also marking the object * by adding appropriate entries to citus.pg_dist_object and also marking the object
* as distributed by opening a connection using current user to all other nodes * as distributed by opening a connection using current user to all remote nodes
* with metadata if object propagation is on. * with metadata if object propagation is on.
* *
* This function should be used if the user creating the given object. If you want * This function should be used if the user creating the given object. If you want
@ -164,7 +164,7 @@ MarkObjectDistributed(const ObjectAddress *distAddress)
{ {
char *workerPgDistObjectUpdateCommand = char *workerPgDistObjectUpdateCommand =
CreatePgDistObjectEntryCommand(distAddress); CreatePgDistObjectEntryCommand(distAddress);
SendCommandToOtherNodesWithMetadata(workerPgDistObjectUpdateCommand); SendCommandToRemoteNodesWithMetadata(workerPgDistObjectUpdateCommand);
} }
} }
@ -172,7 +172,7 @@ MarkObjectDistributed(const ObjectAddress *distAddress)
/* /*
* MarkObjectDistributedViaSuperUser marks an object as a distributed object. Marking * MarkObjectDistributedViaSuperUser marks an object as a distributed object. Marking
* is done by adding appropriate entries to citus.pg_dist_object and also marking the * is done by adding appropriate entries to citus.pg_dist_object and also marking the
* object as distributed by opening a connection using super user to all other nodes * object as distributed by opening a connection using super user to all remote nodes
* with metadata if object propagation is on. * with metadata if object propagation is on.
* *
* This function should be used to mark dependent object as distributed. If you want * This function should be used to mark dependent object as distributed. If you want
@ -187,7 +187,7 @@ MarkObjectDistributedViaSuperUser(const ObjectAddress *distAddress)
{ {
char *workerPgDistObjectUpdateCommand = char *workerPgDistObjectUpdateCommand =
CreatePgDistObjectEntryCommand(distAddress); CreatePgDistObjectEntryCommand(distAddress);
SendCommandToOtherNodesWithMetadataViaSuperUser(workerPgDistObjectUpdateCommand); SendCommandToRemoteNodesWithMetadataViaSuperUser(workerPgDistObjectUpdateCommand);
} }
} }

View File

@ -179,11 +179,11 @@ ActivePrimaryNodeList(LOCKMODE lockMode)
/* /*
* ActivePrimaryOtherNodesList returns a list of all active primary nodes in * ActivePrimaryRemoteNodeList returns a list of all active primary nodes in
* workerNodeHash except the local one. * workerNodeHash except the local one.
*/ */
List * List *
ActivePrimaryOtherNodesList(LOCKMODE lockMode) ActivePrimaryRemoteNodeList(LOCKMODE lockMode)
{ {
EnsureModificationsCanRun(); EnsureModificationsCanRun();
return FilterActiveNodeListFunc(lockMode, NodeIsPrimaryAndRemote); return FilterActiveNodeListFunc(lockMode, NodeIsPrimaryAndRemote);

View File

@ -34,10 +34,10 @@
#include "utils/memutils.h" #include "utils/memutils.h"
#include "utils/builtins.h" #include "utils/builtins.h"
static void SendCommandToOtherMetadataNodesParams(const char *command, static void SendCommandToRemoteMetadataNodesParams(const char *command,
const char *user, int parameterCount, const char *user, int parameterCount,
const Oid *parameterTypes, const Oid *parameterTypes,
const char *const *parameterValues); const char *const *parameterValues);
static void SendBareCommandListToMetadataNodesInternal(List *commandList, static void SendBareCommandListToMetadataNodesInternal(List *commandList,
TargetWorkerSet targetWorkerSet); TargetWorkerSet targetWorkerSet);
static void SendCommandToMetadataWorkersParams(const char *command, static void SendCommandToMetadataWorkersParams(const char *command,
@ -157,20 +157,20 @@ SendCommandListToWorkersWithMetadata(List *commands)
/* /*
* SendCommandToOtherNodesWithMetadata sends a command to other nodes in * SendCommandToRemoteNodesWithMetadata sends a command to remote nodes in
* parallel. Commands are committed on the nodes when the local transaction * parallel. Commands are committed on the nodes when the local transaction
* commits. * commits.
*/ */
void void
SendCommandToOtherNodesWithMetadata(const char *command) SendCommandToRemoteNodesWithMetadata(const char *command)
{ {
SendCommandToOtherMetadataNodesParams(command, CurrentUserName(), SendCommandToRemoteMetadataNodesParams(command, CurrentUserName(),
0, NULL, NULL); 0, NULL, NULL);
} }
/* /*
* SendCommandToOtherNodesWithMetadataViaSuperUser sends a command to other * SendCommandToRemoteNodesWithMetadataViaSuperUser sends a command to remote
* nodes in parallel by opening a super user connection. Commands are committed * nodes in parallel by opening a super user connection. Commands are committed
* on the nodes when the local transaction commits. The connection are made as * on the nodes when the local transaction commits. The connection are made as
* the extension owner to ensure write access to the Citus metadata tables. * the extension owner to ensure write access to the Citus metadata tables.
@ -180,38 +180,38 @@ SendCommandToOtherNodesWithMetadata(const char *command)
* tuples for dependent objects. * tuples for dependent objects.
*/ */
void void
SendCommandToOtherNodesWithMetadataViaSuperUser(const char *command) SendCommandToRemoteNodesWithMetadataViaSuperUser(const char *command)
{ {
SendCommandToOtherMetadataNodesParams(command, CitusExtensionOwnerName(), SendCommandToRemoteMetadataNodesParams(command, CitusExtensionOwnerName(),
0, NULL, NULL); 0, NULL, NULL);
} }
/* /*
* SendCommandListToOtherNodesWithMetadata sends all commands to other nodes * SendCommandListToRemoteNodesWithMetadata sends all commands to remote nodes
* with the current user. See `SendCommandToOtherNodesWithMetadata`for details. * with the current user. See `SendCommandToRemoteNodesWithMetadata`for details.
*/ */
void void
SendCommandListToOtherNodesWithMetadata(List *commands) SendCommandListToRemoteNodesWithMetadata(List *commands)
{ {
char *command = NULL; char *command = NULL;
foreach_ptr(command, commands) foreach_ptr(command, commands)
{ {
SendCommandToOtherNodesWithMetadata(command); SendCommandToRemoteNodesWithMetadata(command);
} }
} }
/* /*
* SendCommandToOtherMetadataNodesParams is a wrapper around * SendCommandToRemoteMetadataNodesParams is a wrapper around
* SendCommandToWorkersParamsInternal() that can be used to send commands * SendCommandToWorkersParamsInternal() that can be used to send commands
* to other metadata nodes. * to remote metadata nodes.
*/ */
static void static void
SendCommandToOtherMetadataNodesParams(const char *command, SendCommandToRemoteMetadataNodesParams(const char *command,
const char *user, int parameterCount, const char *user, int parameterCount,
const Oid *parameterTypes, const Oid *parameterTypes,
const char *const *parameterValues) const char *const *parameterValues)
{ {
/* use METADATA_NODES so that ErrorIfAnyMetadataNodeOutOfSync checks local node as well */ /* use METADATA_NODES so that ErrorIfAnyMetadataNodeOutOfSync checks local node as well */
List *workerNodeList = TargetWorkerSetNodeList(METADATA_NODES, List *workerNodeList = TargetWorkerSetNodeList(METADATA_NODES,
@ -219,7 +219,7 @@ SendCommandToOtherMetadataNodesParams(const char *command,
ErrorIfAnyMetadataNodeOutOfSync(workerNodeList); ErrorIfAnyMetadataNodeOutOfSync(workerNodeList);
SendCommandToWorkersParamsInternal(OTHER_METADATA_NODES, command, user, SendCommandToWorkersParamsInternal(REMOTE_METADATA_NODES, command, user,
parameterCount, parameterTypes, parameterValues); parameterCount, parameterTypes, parameterValues);
} }
@ -236,9 +236,9 @@ TargetWorkerSetNodeList(TargetWorkerSet targetWorkerSet, LOCKMODE lockMode)
{ {
workerNodeList = ActivePrimaryNodeList(lockMode); workerNodeList = ActivePrimaryNodeList(lockMode);
} }
else if (targetWorkerSet == OTHER_NODES || targetWorkerSet == OTHER_METADATA_NODES) else if (targetWorkerSet == REMOTE_NODES || targetWorkerSet == REMOTE_METADATA_NODES)
{ {
workerNodeList = ActivePrimaryOtherNodesList(lockMode); workerNodeList = ActivePrimaryRemoteNodeList(lockMode);
} }
else if (targetWorkerSet == NON_COORDINATOR_METADATA_NODES || else if (targetWorkerSet == NON_COORDINATOR_METADATA_NODES ||
targetWorkerSet == NON_COORDINATOR_NODES) targetWorkerSet == NON_COORDINATOR_NODES)
@ -257,7 +257,7 @@ TargetWorkerSetNodeList(TargetWorkerSet targetWorkerSet, LOCKMODE lockMode)
foreach_ptr(workerNode, workerNodeList) foreach_ptr(workerNode, workerNodeList)
{ {
if ((targetWorkerSet == NON_COORDINATOR_METADATA_NODES || if ((targetWorkerSet == NON_COORDINATOR_METADATA_NODES ||
targetWorkerSet == OTHER_METADATA_NODES || targetWorkerSet == REMOTE_METADATA_NODES ||
targetWorkerSet == METADATA_NODES) && targetWorkerSet == METADATA_NODES) &&
!workerNode->hasMetadata) !workerNode->hasMetadata)
{ {
@ -272,15 +272,15 @@ TargetWorkerSetNodeList(TargetWorkerSet targetWorkerSet, LOCKMODE lockMode)
/* /*
* SendBareCommandListToOtherMetadataNodes is a wrapper around * SendBareCommandListToRemoteMetadataNodes is a wrapper around
* SendBareCommandListToMetadataNodesInternal() that can be used to send * SendBareCommandListToMetadataNodesInternal() that can be used to send
* bare commands to other metadata nodes. * bare commands to remote metadata nodes.
*/ */
void void
SendBareCommandListToOtherMetadataNodes(List *commandList) SendBareCommandListToRemoteMetadataNodes(List *commandList)
{ {
SendBareCommandListToMetadataNodesInternal(commandList, SendBareCommandListToMetadataNodesInternal(commandList,
OTHER_METADATA_NODES); REMOTE_METADATA_NODES);
} }

View File

@ -71,7 +71,7 @@ extern uint32 ActivePrimaryNonCoordinatorNodeCount(void);
extern uint32 ActiveReadableNodeCount(void); extern uint32 ActiveReadableNodeCount(void);
extern List * ActivePrimaryNonCoordinatorNodeList(LOCKMODE lockMode); extern List * ActivePrimaryNonCoordinatorNodeList(LOCKMODE lockMode);
extern List * ActivePrimaryNodeList(LOCKMODE lockMode); extern List * ActivePrimaryNodeList(LOCKMODE lockMode);
extern List * ActivePrimaryOtherNodesList(LOCKMODE lockMode); extern List * ActivePrimaryRemoteNodeList(LOCKMODE lockMode);
extern bool CoordinatorAddedAsWorkerNode(void); extern bool CoordinatorAddedAsWorkerNode(void);
extern List * ReferenceTablePlacementNodeList(LOCKMODE lockMode); extern List * ReferenceTablePlacementNodeList(LOCKMODE lockMode);
extern WorkerNode * CoordinatorNodeIfAddedAsWorkerOrError(void); extern WorkerNode * CoordinatorNodeIfAddedAsWorkerOrError(void);

View File

@ -33,7 +33,7 @@ typedef enum TargetWorkerSet
* All the active primary nodes in the metadata which have metadata * All the active primary nodes in the metadata which have metadata
* except the local node * except the local node
*/ */
OTHER_METADATA_NODES, REMOTE_METADATA_NODES,
/* /*
* All the active primary nodes in the metadata except the coordinator * All the active primary nodes in the metadata except the coordinator
@ -43,7 +43,7 @@ typedef enum TargetWorkerSet
/* /*
* All the active primary nodes in the metadata except the local node * All the active primary nodes in the metadata except the local node
*/ */
OTHER_NODES, REMOTE_NODES,
/* /*
* All active primary nodes in the metadata * All active primary nodes in the metadata
@ -85,10 +85,10 @@ extern bool SendOptionalMetadataCommandListToWorkerInCoordinatedTransaction(cons
extern void SendCommandToWorkersWithMetadata(const char *command); extern void SendCommandToWorkersWithMetadata(const char *command);
extern void SendCommandToWorkersWithMetadataViaSuperUser(const char *command); extern void SendCommandToWorkersWithMetadataViaSuperUser(const char *command);
extern void SendCommandListToWorkersWithMetadata(List *commands); extern void SendCommandListToWorkersWithMetadata(List *commands);
extern void SendCommandToOtherNodesWithMetadata(const char *command); extern void SendCommandToRemoteNodesWithMetadata(const char *command);
extern void SendCommandToOtherNodesWithMetadataViaSuperUser(const char *command); extern void SendCommandToRemoteNodesWithMetadataViaSuperUser(const char *command);
extern void SendCommandListToOtherNodesWithMetadata(List *commands); extern void SendCommandListToRemoteNodesWithMetadata(List *commands);
extern void SendBareCommandListToOtherMetadataNodes(List *commandList); extern void SendBareCommandListToRemoteMetadataNodes(List *commandList);
extern void SendBareCommandListToMetadataWorkers(List *commandList); extern void SendBareCommandListToMetadataWorkers(List *commandList);
extern void EnsureNoModificationsHaveBeenDone(void); extern void EnsureNoModificationsHaveBeenDone(void);
extern void SendCommandListToWorkerOutsideTransaction(const char *nodeName, extern void SendCommandListToWorkerOutsideTransaction(const char *nodeName,