diff --git a/src/backend/distributed/metadata/metadata_sync.c b/src/backend/distributed/metadata/metadata_sync.c index e3310c5c8..8d4a02b6d 100644 --- a/src/backend/distributed/metadata/metadata_sync.c +++ b/src/backend/distributed/metadata/metadata_sync.c @@ -36,16 +36,18 @@ #include "catalog/pg_proc.h" #include "catalog/pg_type.h" #include "commands/async.h" +#include "distributed/adaptive_executor.h" #include "distributed/argutils.h" #include "distributed/backend_data.h" #include "distributed/citus_ruleutils.h" #include "distributed/colocation_utils.h" #include "distributed/commands.h" +#include "distributed/coordinator_protocol.h" #include "distributed/deparser.h" +#include "distributed/deparse_shard_query.h" #include "distributed/distribution_column.h" #include "distributed/listutils.h" #include "distributed/metadata_utility.h" -#include "distributed/coordinator_protocol.h" #include "distributed/maintenanced.h" #include "distributed/metadata_cache.h" #include "distributed/metadata_sync.h" @@ -91,6 +93,7 @@ /* managed via a GUC */ char *EnableManualMetadataChangesForUser = ""; int MetadataSyncTransMode = METADATA_SYNC_TRANSACTIONAL; +int MetadataSyncBatchCount = METADATA_SYNC_DEFAULT_BATCH_COUNT; static void EnsureObjectMetadataIsSane(int distributionArgumentIndex, @@ -146,6 +149,8 @@ static char * ColocationGroupCreateCommand(uint32 colocationId, int shardCount, static char * ColocationGroupDeleteCommand(uint32 colocationId); static char * RemoteTypeIdExpression(Oid typeId); static char * RemoteCollationIdExpression(Oid colocationId); +static bool ExceedsSyncBatchCount(MetadataSyncContext *context); +static void ExecuteCommandListOnWorkerList(List *commandList, List *workerList); PG_FUNCTION_INFO_V1(start_metadata_sync_to_all_nodes); @@ -201,11 +206,11 @@ start_metadata_sync_to_node(PG_FUNCTION_ARGS) * It contains activated nodes, bare connections if the mode is nontransactional, * and a memory context for allocation. */ - bool collectCommands = false; bool nodesAddedInSameTransaction = false; + bool noConnectionMode = false; MetadataSyncContext *context = CreateMetadataSyncContext(list_make1(workerNode), - collectCommands, - nodesAddedInSameTransaction); + nodesAddedInSameTransaction, + noConnectionMode); ActivateNodeList(context); TransactionModifiedNodeMetadata = true; @@ -234,11 +239,11 @@ start_metadata_sync_to_all_nodes(PG_FUNCTION_ARGS) * It contains activated nodes, bare connections if the mode is nontransactional, * and a memory context for allocation. */ - bool collectCommands = false; bool nodesAddedInSameTransaction = false; + bool noConnectionMode = false; MetadataSyncContext *context = CreateMetadataSyncContext(nodeList, - collectCommands, - nodesAddedInSameTransaction); + nodesAddedInSameTransaction, + noConnectionMode); ActivateNodeList(context); TransactionModifiedNodeMetadata = true; @@ -4008,15 +4013,15 @@ EstablishAndSetMetadataSyncBareConnections(MetadataSyncContext *context) * CreateMetadataSyncContext creates a context which contains worker connections * and a MemoryContext to be used throughout the metadata sync. * - * If we collect commands, connections will not be established as caller's intent - * is to collect sync commands. + * If noConnectionMode is set, we do not establish connection. Instead, we just + * collect commands. * * If the nodes are newly added before activation, we would not try to unset * metadatasynced in separate transaction during nontransactional metadatasync. */ MetadataSyncContext * -CreateMetadataSyncContext(List *nodeList, bool collectCommands, - bool nodesAddedInSameTransaction) +CreateMetadataSyncContext(List *nodeList, bool nodesAddedInSameTransaction, + bool noConnectionMode) { /* should be alive during local transaction during the sync */ MemoryContext context = AllocSetContextCreate(TopTransactionContext, @@ -4028,18 +4033,18 @@ CreateMetadataSyncContext(List *nodeList, bool collectCommands, metadataSyncContext->context = context; metadataSyncContext->transactionMode = MetadataSyncTransMode; - metadataSyncContext->collectCommands = collectCommands; metadataSyncContext->collectedCommands = NIL; metadataSyncContext->nodesAddedInSameTransaction = nodesAddedInSameTransaction; + metadataSyncContext->noConnectionMode = noConnectionMode; /* filter the nodes that needs to be activated from given node list */ SetMetadataSyncNodesFromNodeList(metadataSyncContext, nodeList); /* - * establish connections only for nontransactional mode to prevent connection - * open-close for each command + * Establish connections only for nontransactional mode to prevent connection + * open-close for each command. */ - if (!collectCommands && MetadataSyncTransMode == METADATA_SYNC_NON_TRANSACTIONAL) + if (!noConnectionMode && MetadataSyncTransMode == METADATA_SYNC_NON_TRANSACTIONAL) { EstablishAndSetMetadataSyncBareConnections(metadataSyncContext); } @@ -4055,53 +4060,160 @@ CreateMetadataSyncContext(List *nodeList, bool collectCommands, /* - * ResetMetadataSyncMemoryContext resets memory context inside metadataSyncContext, if - * we are not collecting commands. + * ExceedsSyncBatchCount returns whether given context accumulated + * more than METADATA_SYNC_DEFAULT_BATCH_COUNT. */ -void -ResetMetadataSyncMemoryContext(MetadataSyncContext *context) +static bool +ExceedsSyncBatchCount(MetadataSyncContext *context) { - if (!MetadataSyncCollectsCommands(context)) + int collectedCommandCount = list_length(context->collectedCommands); + return collectedCommandCount > METADATA_SYNC_DEFAULT_BATCH_COUNT; +} + + +/* + * ExecuteCommandListOnWorkerList executes a command list on all nodes + * in the given list. + */ +static void +ExecuteCommandListOnWorkerList(List *commandList, List *workerList) +{ + List *taskList = NIL; + + WorkerNode *workerNode = NULL; + foreach_ptr(workerNode, workerList) { - MemoryContextReset(context->context); + ShardPlacement *placement = CitusMakeNode(ShardPlacement); + SetPlacementNodeMetadata(placement, workerNode); + + Task *task = CitusMakeNode(Task); + task->taskType = DDL_TASK; + SetTaskQueryStringList(task, commandList); + task->taskPlacementList = list_make1(placement); + + taskList = lappend(taskList, task); } + + bool localExecutionSupported = false; + + ExecuteUtilityTaskList(taskList, localExecutionSupported); } /* - * MetadataSyncCollectsCommands returns whether context is used for collecting - * commands instead of sending them to workers. - */ -bool -MetadataSyncCollectsCommands(MetadataSyncContext *context) -{ - return context->collectCommands; -} - - -/* - * SendOrCollectCommandListToActivatedNodes sends the commands to the activated nodes with - * bare connections inside metadatacontext or via coordinated connections. - * Note that when context only collects commands, we add commands into the context - * without sending the commands. + * ProcessBatchCommandsToActivatedNodes processes collected commands inside + * metadataSyncContext. Sends commands to activated workers if batch limit + * is exceeded or user forces. If noConnectionMode is set, it would not process + * the commands. */ void -SendOrCollectCommandListToActivatedNodes(MetadataSyncContext *context, List *commands) +ProcessBatchCommandsToActivatedNodes(MetadataSyncContext *context, bool forceSend) { - /* do nothing if no commands */ - if (commands == NIL) + /* do not send and reset commands if we are in noConnectionMode */ + if (MetadataSyncInNoConnectionMode(context)) { return; } - /* - * do not send any command to workers if we collect commands. - * Collect commands into metadataSyncContext's collected command - * list. - */ - if (MetadataSyncCollectsCommands(context)) + if (forceSend || ExceedsSyncBatchCount(context)) + { + SendCollectedCommandsToActivatedNodes(context); + ResetMetadataSyncMemoryContext(context); + } +} + + +/* + * ProcessBatchCommandsToSingleNode does the same operation as + * ProcessBatchCommandsToActivatedNodes but for metadata nodes. + */ +void +ProcessBatchCommandsToMetadataNodes(MetadataSyncContext *context, bool forceSend) +{ + /* do not send and reset commands if we are in noConnectionMode */ + if (MetadataSyncInNoConnectionMode(context)) + { + return; + } + + if (forceSend || ExceedsSyncBatchCount(context)) + { + SendCollectedCommandsToMetadataNodes(context); + ResetMetadataSyncMemoryContext(context); + } +} + + +/* + * ProcessBatchCommandsToSingleNode does the same as operation + * ProcessBatchCommandsToActivatedNodes but for only given node. + */ +void +ProcessBatchCommandsToSingleNode(MetadataSyncContext *context, int nodeIdx, + bool forceSend) +{ + /* do not send and reset commands if we are in noConnectionMode */ + if (MetadataSyncInNoConnectionMode(context)) + { + return; + } + + if (forceSend || ExceedsSyncBatchCount(context)) + { + SendCollectedCommandsToSingleNode(context, nodeIdx); + ResetMetadataSyncMemoryContext(context); + } +} + + +/* + * ResetMetadataSyncMemoryContext resets memory context inside metadataSyncContext. + */ +void +ResetMetadataSyncMemoryContext(MetadataSyncContext *context) +{ + Assert(!MetadataSyncInNoConnectionMode(context)); + MemoryContextReset(context->context); + context->collectedCommands = NIL; +} + + +/* + * CollectCommandIntoMetadataSyncContext collcets given commands into metadataSyncContext. + */ +void +CollectCommandIntoMetadataSyncContext(MetadataSyncContext *context, List *commandList) +{ + context->collectedCommands = list_concat(context->collectedCommands, commandList); +} + + +/* + * MetadataSyncInNoConnectionMode returns whether context is used for collecting + * commands instead of sending them to workers. + */ +bool +MetadataSyncInNoConnectionMode(MetadataSyncContext *context) +{ + return context->noConnectionMode; +} + + +/* + * SendCollectedCommandsToActivatedNodes sends collected commands to the activated + * nodes with bare connections inside metadatacontext or via coordinated connections. + * Note that when we are in noConnectionMode, we add commands into the context without + * sending the commands. + */ +void +SendCollectedCommandsToActivatedNodes(MetadataSyncContext *context) +{ + Assert(!MetadataSyncInNoConnectionMode(context)); + + /* do nothing if no commands */ + List *commands = context->collectedCommands; + if (commands == NIL) { - context->collectedCommands = list_concat(context->collectedCommands, commands); return; } @@ -4111,9 +4223,11 @@ SendOrCollectCommandListToActivatedNodes(MetadataSyncContext *context, List *com if (context->transactionMode == METADATA_SYNC_TRANSACTIONAL) { List *workerNodes = context->activatedWorkerNodeList; - SendMetadataCommandListToWorkerListInCoordinatedTransaction(workerNodes, - CurrentUserName(), - commands); + ExecuteCommandListOnWorkerList(commands, workerNodes); + + /* SendMetadataCommandListToWorkerListInCoordinatedTransaction(workerNodes, */ + /* CurrentUserName(), */ + /* commands); */ } else if (context->transactionMode == METADATA_SYNC_NON_TRANSACTIONAL) { @@ -4128,22 +4242,20 @@ SendOrCollectCommandListToActivatedNodes(MetadataSyncContext *context, List *com /* - * SendOrCollectCommandListToMetadataNodes sends the commands to the metadata nodes with - * bare connections inside metadatacontext or via coordinated connections. - * Note that when context only collects commands, we add commands into the context - * without sending the commands. + * SendCollectedCommandsToMetadataNodes sends collected commands to the metadata + * nodes with bare connections inside metadatacontext or via coordinated connections. + * Note that when we are in noConnectionMode, we add commands into the context without + * sending the commands. */ void -SendOrCollectCommandListToMetadataNodes(MetadataSyncContext *context, List *commands) +SendCollectedCommandsToMetadataNodes(MetadataSyncContext *context) { - /* - * do not send any command to workers if we collcet commands. - * Collect commands into metadataSyncContext's collected command - * list. - */ - if (MetadataSyncCollectsCommands(context)) + Assert(!MetadataSyncInNoConnectionMode(context)); + + /* do nothing if no commands */ + List *commands = context->collectedCommands; + if (commands == NIL) { - context->collectedCommands = list_concat(context->collectedCommands, commands); return; } @@ -4154,9 +4266,11 @@ SendOrCollectCommandListToMetadataNodes(MetadataSyncContext *context, List *comm { List *metadataNodes = TargetWorkerSetNodeList(NON_COORDINATOR_METADATA_NODES, RowShareLock); - SendMetadataCommandListToWorkerListInCoordinatedTransaction(metadataNodes, - CurrentUserName(), - commands); + ExecuteCommandListOnWorkerList(commands, metadataNodes); + + /* SendMetadataCommandListToWorkerListInCoordinatedTransaction(metadataNodes, */ + /* CurrentUserName(), */ + /* commands); */ } else if (context->transactionMode == METADATA_SYNC_NON_TRANSACTIONAL) { @@ -4170,23 +4284,20 @@ SendOrCollectCommandListToMetadataNodes(MetadataSyncContext *context, List *comm /* - * SendOrCollectCommandListToSingleNode sends the commands to the specific worker + * SendCollectedCommandsToSingleNode sends collected commands to the specific worker * indexed by nodeIdx with bare connection inside metadatacontext or via coordinated - * connection. Note that when context only collects commands, we add commands into + * connection. Note that when we are in noConnectionMode, we add commands into * the context without sending the commands. */ void -SendOrCollectCommandListToSingleNode(MetadataSyncContext *context, List *commands, - int nodeIdx) +SendCollectedCommandsToSingleNode(MetadataSyncContext *context, int nodeIdx) { - /* - * Do not send any command to workers if we collect commands. - * Collect commands into metadataSyncContext's collected command - * list. - */ - if (MetadataSyncCollectsCommands(context)) + Assert(!MetadataSyncInNoConnectionMode(context)); + + /* do nothing if no commands */ + List *commands = context->collectedCommands; + if (commands == NIL) { - context->collectedCommands = list_concat(context->collectedCommands, commands); return; } @@ -4199,9 +4310,11 @@ SendOrCollectCommandListToSingleNode(MetadataSyncContext *context, List *command Assert(nodeIdx < list_length(workerNodes)); WorkerNode *node = list_nth(workerNodes, nodeIdx); - SendMetadataCommandListToWorkerListInCoordinatedTransaction(list_make1(node), - CurrentUserName(), - commands); + ExecuteCommandListOnWorkerList(commands, list_make1(node)); + + /* SendMetadataCommandListToWorkerListInCoordinatedTransaction(list_make1(node), */ + /* CurrentUserName(), */ + /* commands); */ } else if (context->transactionMode == METADATA_SYNC_NON_TRANSACTIONAL) { @@ -4243,7 +4356,6 @@ WorkerDropAllShellTablesCommand(bool singleTransaction) static List * PropagateNodeWideObjectsCommandList(void) { - /* collect all commands */ List *ddlCommands = NIL; if (EnableAlterRoleSetPropagation) @@ -4330,17 +4442,24 @@ SyncDistributedObjects(MetadataSyncContext *context) void SendNodeWideObjectsSyncCommands(MetadataSyncContext *context) { + MemoryContext oldContext = MemoryContextSwitchTo(context->context); + /* propagate node wide objects. It includes only roles for now. */ List *commandList = PropagateNodeWideObjectsCommandList(); if (commandList == NIL) { + MemoryContextSwitchTo(oldContext); return; } commandList = lcons(DISABLE_DDL_PROPAGATION, commandList); commandList = lappend(commandList, ENABLE_DDL_PROPAGATION); - SendOrCollectCommandListToActivatedNodes(context, commandList); + + bool forceSend = true; + CollectCommandIntoMetadataSyncContext(context, commandList); + ProcessBatchCommandsToActivatedNodes(context, forceSend); + MemoryContextSwitchTo(oldContext); } @@ -4352,14 +4471,22 @@ SendNodeWideObjectsSyncCommands(MetadataSyncContext *context) void SendShellTableDeletionCommands(MetadataSyncContext *context) { + bool forceSend = true; + + MemoryContext oldContext = MemoryContextSwitchTo(context->context); + /* break all sequence deps for citus tables and remove all shell tables */ char *breakSeqDepsCommand = BREAK_CITUS_TABLE_SEQUENCE_DEPENDENCY_COMMAND; - SendOrCollectCommandListToActivatedNodes(context, list_make1(breakSeqDepsCommand)); + CollectCommandIntoMetadataSyncContext(context, list_make1(breakSeqDepsCommand)); + ProcessBatchCommandsToActivatedNodes(context, forceSend); /* remove shell tables */ bool singleTransaction = (context->transactionMode == METADATA_SYNC_TRANSACTIONAL); char *dropShellTablesCommand = WorkerDropAllShellTablesCommand(singleTransaction); - SendOrCollectCommandListToActivatedNodes(context, list_make1(dropShellTablesCommand)); + CollectCommandIntoMetadataSyncContext(context, list_make1(dropShellTablesCommand)); + ProcessBatchCommandsToActivatedNodes(context, forceSend); + + MemoryContextSwitchTo(oldContext); } @@ -4371,21 +4498,27 @@ SendShellTableDeletionCommands(MetadataSyncContext *context) void SendMetadataDeletionCommands(MetadataSyncContext *context) { + MemoryContext oldContext = MemoryContextSwitchTo(context->context); + /* remove pg_dist_partition entries */ - SendOrCollectCommandListToActivatedNodes(context, list_make1(DELETE_ALL_PARTITIONS)); + CollectCommandIntoMetadataSyncContext(context, list_make1(DELETE_ALL_PARTITIONS)); /* remove pg_dist_shard entries */ - SendOrCollectCommandListToActivatedNodes(context, list_make1(DELETE_ALL_SHARDS)); + CollectCommandIntoMetadataSyncContext(context, list_make1(DELETE_ALL_SHARDS)); /* remove pg_dist_placement entries */ - SendOrCollectCommandListToActivatedNodes(context, list_make1(DELETE_ALL_PLACEMENTS)); + CollectCommandIntoMetadataSyncContext(context, list_make1(DELETE_ALL_PLACEMENTS)); /* remove pg_dist_object entries */ - SendOrCollectCommandListToActivatedNodes(context, - list_make1(DELETE_ALL_DISTRIBUTED_OBJECTS)); + CollectCommandIntoMetadataSyncContext(context, + list_make1(DELETE_ALL_DISTRIBUTED_OBJECTS)); /* remove pg_dist_colocation entries */ - SendOrCollectCommandListToActivatedNodes(context, list_make1(DELETE_ALL_COLOCATION)); + CollectCommandIntoMetadataSyncContext(context, list_make1(DELETE_ALL_COLOCATION)); + + bool forceSend = true; + ProcessBatchCommandsToActivatedNodes(context, forceSend); + MemoryContextSwitchTo(oldContext); } @@ -4396,6 +4529,7 @@ SendMetadataDeletionCommands(MetadataSyncContext *context) void SendColocationMetadataCommands(MetadataSyncContext *context) { + bool forceSend = false; ScanKeyData scanKey[1]; int scanKeyCount = 0; @@ -4407,11 +4541,12 @@ SendColocationMetadataCommands(MetadataSyncContext *context) HeapTuple nextTuple = NULL; while (true) { - ResetMetadataSyncMemoryContext(context); - nextTuple = systable_getnext(scanDesc); if (!HeapTupleIsValid(nextTuple)) { + /* send if we have commands that are not sent */ + forceSend = true; + ProcessBatchCommandsToActivatedNodes(context, forceSend); break; } @@ -4476,7 +4611,8 @@ SendColocationMetadataCommands(MetadataSyncContext *context) " = c.collnamespace)"); List *commandList = list_make1(colocationGroupCreateCommand->data); - SendOrCollectCommandListToActivatedNodes(context, commandList); + CollectCommandIntoMetadataSyncContext(context, commandList); + ProcessBatchCommandsToActivatedNodes(context, forceSend); } MemoryContextSwitchTo(oldContext); @@ -4493,11 +4629,14 @@ SendColocationMetadataCommands(MetadataSyncContext *context) void SendDependencyCreationCommands(MetadataSyncContext *context) { - /* disable ddl propagation */ - SendOrCollectCommandListToActivatedNodes(context, - list_make1(DISABLE_DDL_PROPAGATION)); - - MemoryContext oldContext = MemoryContextSwitchTo(context->context); + /* + * We need to preserve collected dependencies as we reset memory context + * inside metadataSyncContext during commands generation below. + */ + MemoryContext dependencyContext = AllocSetContextCreate(CurrentMemoryContext, + "dependency context", + ALLOCSET_DEFAULT_SIZES); + MemoryContext oldContext = MemoryContextSwitchTo(dependencyContext); /* collect all dependencies in creation order and get their ddl commands */ List *dependencies = GetDistributedObjectAddressList(); @@ -4512,22 +4651,16 @@ SendDependencyCreationCommands(MetadataSyncContext *context) dependencies = OrderObjectAddressListInDependencyOrder(dependencies); - /* - * We need to create a subcontext as we reset the context after each dependency - * creation but we want to preserve all dependency objects at metadataSyncContext. - */ - MemoryContext commandsContext = AllocSetContextCreate(context->context, - "dependency commands context", - ALLOCSET_DEFAULT_SIZES); - MemoryContextSwitchTo(commandsContext); + MemoryContextSwitchTo(context->context); + + /* disable ddl propagation */ + bool forceSend = true; + CollectCommandIntoMetadataSyncContext(context, list_make1(DISABLE_DDL_PROPAGATION)); + ProcessBatchCommandsToActivatedNodes(context, forceSend); + ObjectAddress *dependency = NULL; foreach_ptr(dependency, dependencies) { - if (!MetadataSyncCollectsCommands(context)) - { - MemoryContextReset(commandsContext); - } - if (IsAnyObjectAddressOwnedByExtension(list_make1(dependency), NULL)) { /* @@ -4538,19 +4671,18 @@ SendDependencyCreationCommands(MetadataSyncContext *context) } /* dependency creation commands */ + forceSend = false; List *ddlCommands = GetAllDependencyCreateDDLCommands(list_make1(dependency)); - SendOrCollectCommandListToActivatedNodes(context, ddlCommands); + CollectCommandIntoMetadataSyncContext(context, ddlCommands); + ProcessBatchCommandsToActivatedNodes(context, forceSend); } - MemoryContextSwitchTo(oldContext); - - if (!MetadataSyncCollectsCommands(context)) - { - MemoryContextDelete(commandsContext); - } - ResetMetadataSyncMemoryContext(context); /* enable ddl propagation */ - SendOrCollectCommandListToActivatedNodes(context, list_make1(ENABLE_DDL_PROPAGATION)); + forceSend = true; + CollectCommandIntoMetadataSyncContext(context, list_make1(ENABLE_DDL_PROPAGATION)); + ProcessBatchCommandsToActivatedNodes(context, forceSend); + MemoryContextSwitchTo(oldContext); + MemoryContextDelete(dependencyContext); } @@ -4562,6 +4694,7 @@ SendDependencyCreationCommands(MetadataSyncContext *context) void SendDistTableMetadataCommands(MetadataSyncContext *context) { + bool forceSend = false; ScanKeyData scanKey[1]; int scanKeyCount = 0; @@ -4575,11 +4708,12 @@ SendDistTableMetadataCommands(MetadataSyncContext *context) HeapTuple nextTuple = NULL; while (true) { - ResetMetadataSyncMemoryContext(context); - nextTuple = systable_getnext(scanDesc); if (!HeapTupleIsValid(nextTuple)) { + /* send if we have commands that are not sent */ + forceSend = true; + ProcessBatchCommandsToActivatedNodes(context, forceSend); break; } @@ -4594,7 +4728,8 @@ SendDistTableMetadataCommands(MetadataSyncContext *context) } List *commandList = CitusTableMetadataCreateCommandList(relationId); - SendOrCollectCommandListToActivatedNodes(context, commandList); + CollectCommandIntoMetadataSyncContext(context, commandList); + ProcessBatchCommandsToActivatedNodes(context, forceSend); } MemoryContextSwitchTo(oldContext); @@ -4611,6 +4746,7 @@ SendDistTableMetadataCommands(MetadataSyncContext *context) void SendDistObjectCommands(MetadataSyncContext *context) { + bool forceSend = false; ScanKeyData scanKey[1]; int scanKeyCount = 0; @@ -4624,11 +4760,12 @@ SendDistObjectCommands(MetadataSyncContext *context) HeapTuple nextTuple = NULL; while (true) { - ResetMetadataSyncMemoryContext(context); - nextTuple = systable_getnext(scanDesc); if (!HeapTupleIsValid(nextTuple)) { + /* send if we have commands that are not sent */ + forceSend = true; + ProcessBatchCommandsToActivatedNodes(context, forceSend); break; } @@ -4678,13 +4815,15 @@ SendDistObjectCommands(MetadataSyncContext *context) forceDelegation = NO_FORCE_PUSHDOWN; } + forceSend = false; char *workerMetadataUpdateCommand = MarkObjectsDistributedCreateCommand(list_make1(address), list_make1_int(distributionArgumentIndex), list_make1_int(colocationId), list_make1_int(forceDelegation)); - SendOrCollectCommandListToActivatedNodes(context, - list_make1(workerMetadataUpdateCommand)); + List *commandList = list_make1(workerMetadataUpdateCommand); + CollectCommandIntoMetadataSyncContext(context, commandList); + ProcessBatchCommandsToActivatedNodes(context, forceSend); } MemoryContextSwitchTo(oldContext); @@ -4702,10 +4841,6 @@ SendDistObjectCommands(MetadataSyncContext *context) void SendInterTableRelationshipCommands(MetadataSyncContext *context) { - /* disable ddl propagation */ - SendOrCollectCommandListToActivatedNodes(context, - list_make1(DISABLE_DDL_PROPAGATION)); - ScanKeyData scanKey[1]; int scanKeyCount = 0; @@ -4716,11 +4851,15 @@ SendInterTableRelationshipCommands(MetadataSyncContext *context) scanKeyCount, scanKey); MemoryContext oldContext = MemoryContextSwitchTo(context->context); + + /* disable ddl propagation */ + bool forceSend = true; + CollectCommandIntoMetadataSyncContext(context, list_make1(DISABLE_DDL_PROPAGATION)); + ProcessBatchCommandsToActivatedNodes(context, forceSend); + HeapTuple nextTuple = NULL; while (true) { - ResetMetadataSyncMemoryContext(context); - nextTuple = systable_getnext(scanDesc); if (!HeapTupleIsValid(nextTuple)) { @@ -4742,14 +4881,18 @@ SendInterTableRelationshipCommands(MetadataSyncContext *context) continue; } + forceSend = false; List *commandList = InterTableRelationshipOfRelationCommandList(relationId); - SendOrCollectCommandListToActivatedNodes(context, commandList); + CollectCommandIntoMetadataSyncContext(context, commandList); + ProcessBatchCommandsToActivatedNodes(context, forceSend); } + + /* enable ddl propagation */ + forceSend = true; + CollectCommandIntoMetadataSyncContext(context, list_make1(ENABLE_DDL_PROPAGATION)); + ProcessBatchCommandsToActivatedNodes(context, forceSend); MemoryContextSwitchTo(oldContext); systable_endscan(scanDesc); table_close(relation, AccessShareLock); - - /* enable ddl propagation */ - SendOrCollectCommandListToActivatedNodes(context, list_make1(ENABLE_DDL_PROPAGATION)); } diff --git a/src/backend/distributed/metadata/node_metadata.c b/src/backend/distributed/metadata/node_metadata.c index 2639b79f0..3be10b8b3 100644 --- a/src/backend/distributed/metadata/node_metadata.c +++ b/src/backend/distributed/metadata/node_metadata.c @@ -762,11 +762,11 @@ citus_activate_node(PG_FUNCTION_ARGS) * It contains activated nodes, bare connections if the mode is nontransactional, * and a memory context for allocation. */ - bool collectCommands = false; bool nodesAddedInSameTransaction = false; + bool noConnectionMode = false; MetadataSyncContext *context = CreateMetadataSyncContext(list_make1(workerNode), - collectCommands, - nodesAddedInSameTransaction); + nodesAddedInSameTransaction, + noConnectionMode); ActivateNodeList(context); TransactionModifiedNodeMetadata = true; @@ -932,7 +932,7 @@ MarkNodesNotSyncedInLoopBackConnection(MetadataSyncContext *context, pid_t parentSessionPid) { Assert(context->transactionMode == METADATA_SYNC_NON_TRANSACTIONAL); - Assert(!MetadataSyncCollectsCommands(context)); + Assert(!MetadataSyncInNoConnectionMode(context)); /* * Set metadatasynced to false for all activated nodes to mark the nodes as not synced @@ -1000,8 +1000,8 @@ MarkNodesNotSyncedInLoopBackConnection(MetadataSyncContext *context, static void SetNodeMetadata(MetadataSyncContext *context, bool localOnly) { - /* do not execute local transaction if we collect commands */ - if (!MetadataSyncCollectsCommands(context)) + /* do not execute local transaction if we are in noConnectionMode */ + if (!MetadataSyncInNoConnectionMode(context)) { List *updatedActivatedNodeList = NIL; @@ -1022,7 +1022,7 @@ SetNodeMetadata(MetadataSyncContext *context, bool localOnly) SetMetadataSyncNodesFromNodeList(context, updatedActivatedNodeList); } - if (!localOnly && EnableMetadataSync) + if (!localOnly) { WorkerNode *node = NULL; foreach_ptr(node, context->activatedWorkerNodeList) @@ -1040,7 +1040,7 @@ SetNodeMetadata(MetadataSyncContext *context, bool localOnly) * The function operates in 3 different modes according to transactionMode inside * metadataSyncContext. * - * 1. MetadataSyncCollectsCommands(context): + * 1. MetadataSyncInNoConnectionMode(context): * Only collect commands instead of sending them to workers, * 2. context.transactionMode == METADATA_SYNC_TRANSACTIONAL: * Send all commands using coordinated transaction, @@ -1090,15 +1090,14 @@ ActivateNodeList(MetadataSyncContext *context) /* * we need to unset metadatasynced flag to false at coordinator in separate - * transaction only at nontransactional sync mode and if we do not collect - * commands. + * transaction only at nontransactional sync mode and if are in noConnectionMode. * * We make sure we set the flag to false at the start of nontransactional * metadata sync to mark those nodes are not synced in case of a failure in * the middle of the sync. */ if (context->transactionMode == METADATA_SYNC_NON_TRANSACTIONAL && - !MetadataSyncCollectsCommands(context)) + !MetadataSyncInNoConnectionMode(context)) { MarkNodesNotSyncedInLoopBackConnection(context, MyProcPid); } @@ -2201,16 +2200,19 @@ AddNodeMetadataViaMetadataContext(char *nodeName, int32 nodePort, } List *nodeList = list_make1(node); - bool collectCommands = false; bool nodesAddedInSameTransaction = true; - MetadataSyncContext *context = CreateMetadataSyncContext(nodeList, collectCommands, - nodesAddedInSameTransaction); + bool noConnectionMode = false; + MetadataSyncContext *context = CreateMetadataSyncContext(nodeList, + nodesAddedInSameTransaction, + noConnectionMode); if (EnableMetadataSync) { + MemoryContext oldContext = MemoryContextSwitchTo(context->context); + /* send the delete command to all primary nodes with metadata */ char *nodeDeleteCommand = NodeDeleteCommand(node->nodeId); - SendOrCollectCommandListToMetadataNodes(context, list_make1(nodeDeleteCommand)); + CollectCommandIntoMetadataSyncContext(context, list_make1(nodeDeleteCommand)); /* finally prepare the insert command and send it to all primary nodes */ uint32 primariesWithMetadata = CountPrimariesWithMetadata(); @@ -2230,9 +2232,12 @@ AddNodeMetadataViaMetadataContext(char *nodeName, int32 nodePort, nodeInsertCommand = NodeListIdempotentInsertCommand(nodeList); } Assert(nodeInsertCommand != NULL); - SendOrCollectCommandListToMetadataNodes(context, - list_make1(nodeInsertCommand)); + CollectCommandIntoMetadataSyncContext(context, list_make1(nodeInsertCommand)); } + + bool forceSend = true; + ProcessBatchCommandsToMetadataNodes(context, forceSend); + MemoryContextSwitchTo(oldContext); } ActivateNodeList(context); @@ -2273,6 +2278,8 @@ static void SetNodeStateViaMetadataContext(MetadataSyncContext *context, WorkerNode *workerNode, Datum value) { + MemoryContext oldContext = MemoryContextSwitchTo(context->context); + char *isActiveCommand = GetMetadataSyncCommandToSetNodeColumn(workerNode, Anum_pg_dist_node_isactive, value); @@ -2285,7 +2292,10 @@ SetNodeStateViaMetadataContext(MetadataSyncContext *context, WorkerNode *workerN List *commandList = list_make3(isActiveCommand, metadatasyncedCommand, hasmetadataCommand); - SendOrCollectCommandListToMetadataNodes(context, commandList); + bool forceSend = true; + CollectCommandIntoMetadataSyncContext(context, commandList); + ProcessBatchCommandsToMetadataNodes(context, forceSend); + MemoryContextSwitchTo(oldContext); } @@ -3033,18 +3043,18 @@ ErrorIfAnyNodeNotExist(List *nodeList) static void UpdateLocalGroupIdsViaMetadataContext(MetadataSyncContext *context) { + MemoryContext oldContext = MemoryContextSwitchTo(context->context); + bool forceSend = true; int activatedPrimaryCount = list_length(context->activatedWorkerNodeList); int nodeIdx = 0; for (nodeIdx = 0; nodeIdx < activatedPrimaryCount; nodeIdx++) { WorkerNode *node = list_nth(context->activatedWorkerNodeList, nodeIdx); List *commandList = list_make1(LocalGroupIdUpdateCommand(node->groupId)); - - /* send commands to new workers, the current user should be a superuser */ - Assert(superuser()); - - SendOrCollectCommandListToSingleNode(context, commandList, nodeIdx); + CollectCommandIntoMetadataSyncContext(context, commandList); + ProcessBatchCommandsToSingleNode(context, nodeIdx, forceSend); } + MemoryContextSwitchTo(oldContext); } @@ -3089,7 +3099,7 @@ SyncNodeMetadata(MetadataSyncContext *context) * Do not fail when we call this method from activate_node_snapshot * from workers. */ - if (!MetadataSyncCollectsCommands(context)) + if (!MetadataSyncInNoConnectionMode(context)) { EnsureCoordinator(); } @@ -3099,6 +3109,8 @@ SyncNodeMetadata(MetadataSyncContext *context) LockRelationOid(DistNodeRelationId(), ExclusiveLock); + MemoryContext oldContext = MemoryContextSwitchTo(context->context); + /* generate the queries which drop the node metadata */ List *dropMetadataCommandList = NodeMetadataDropCommands(); @@ -3113,5 +3125,8 @@ SyncNodeMetadata(MetadataSyncContext *context) * We should have already added node metadata to metadata workers. Sync node * metadata just for activated workers. */ - SendOrCollectCommandListToActivatedNodes(context, recreateNodeSnapshotCommandList); + bool forceSend = true; + CollectCommandIntoMetadataSyncContext(context, recreateNodeSnapshotCommandList); + ProcessBatchCommandsToActivatedNodes(context, forceSend); + MemoryContextSwitchTo(oldContext); } diff --git a/src/backend/distributed/shared_library_init.c b/src/backend/distributed/shared_library_init.c index 23393078b..409b9589e 100644 --- a/src/backend/distributed/shared_library_init.c +++ b/src/backend/distributed/shared_library_init.c @@ -1873,6 +1873,17 @@ RegisterCitusConfigVariables(void) GUC_STANDARD, NULL, NULL, NULL); + DefineCustomIntVariable( + "citus.metadata_sync_batch_size", + gettext_noop("Sets total number of commands in a batch for metadata syncs."), + gettext_noop("metadata sync commands are sent as batches configured by " + "the batch size to reduce network round trip delay."), + &MetadataSyncBatchCount, + METADATA_SYNC_DEFAULT_BATCH_COUNT, 1, 1000, + PGC_SUSET, + GUC_SUPERUSER_ONLY | GUC_NO_SHOW_ALL, + NULL, NULL, NULL); + DefineCustomIntVariable( "citus.metadata_sync_interval", gettext_noop("Sets the time to wait between metadata syncs."), diff --git a/src/backend/distributed/test/metadata_sync.c b/src/backend/distributed/test/metadata_sync.c index 46d2303d6..579dcf6f3 100644 --- a/src/backend/distributed/test/metadata_sync.c +++ b/src/backend/distributed/test/metadata_sync.c @@ -51,14 +51,14 @@ activate_node_snapshot(PG_FUNCTION_ARGS) /* * Create MetadataSyncContext which is used throughout nodes' activation. - * As we set collectCommands to true, it would not create connections to workers. - * Instead it would collect and return sync commands to be sent to workers. + * As we set noConnectionMode to true, it would not establish connections to + * workers. Instead it would collect and return sync commands to be sent to workers. */ - bool collectCommands = true; bool nodesAddedInSameTransaction = false; + bool noConnectionMode = true; MetadataSyncContext *context = CreateMetadataSyncContext(list_make1(dummyWorkerNode), - collectCommands, - nodesAddedInSameTransaction); + nodesAddedInSameTransaction, + noConnectionMode); ActivateNodeList(context); diff --git a/src/backend/distributed/utils/reference_table_utils.c b/src/backend/distributed/utils/reference_table_utils.c index 687ce02a7..6b2aec866 100644 --- a/src/backend/distributed/utils/reference_table_utils.c +++ b/src/backend/distributed/utils/reference_table_utils.c @@ -624,6 +624,7 @@ DeleteAllReplicatedTablePlacementsFromNodeGroupViaMetadataContext( } MemoryContext oldContext = MemoryContextSwitchTo(context->context); + bool forceSend = false; GroupShardPlacement *placement = NULL; foreach_ptr(placement, replicatedPlacementListForGroup) { @@ -633,18 +634,16 @@ DeleteAllReplicatedTablePlacementsFromNodeGroupViaMetadataContext( { char *deletePlacementCommand = DeleteShardPlacementCommand(placement->placementId); - - SendOrCollectCommandListToMetadataNodes(context, - list_make1(deletePlacementCommand)); + List *commandList = list_make1(deletePlacementCommand); + CollectCommandIntoMetadataSyncContext(context, commandList); + ProcessBatchCommandsToMetadataNodes(context, forceSend); } /* do not execute local transaction if we collect commands */ - if (!MetadataSyncCollectsCommands(context)) + if (!MetadataSyncInNoConnectionMode(context)) { DeleteShardPlacementRow(placement->placementId); } - - ResetMetadataSyncMemoryContext(context); } MemoryContextSwitchTo(oldContext); } diff --git a/src/include/distributed/metadata_sync.h b/src/include/distributed/metadata_sync.h index d5878ec71..0767d4934 100644 --- a/src/include/distributed/metadata_sync.h +++ b/src/include/distributed/metadata_sync.h @@ -18,6 +18,8 @@ #include "distributed/metadata_cache.h" #include "nodes/pg_list.h" +#define METADATA_SYNC_DEFAULT_BATCH_COUNT 10 + /* managed via guc.c */ typedef enum { @@ -29,6 +31,7 @@ typedef enum extern int MetadataSyncInterval; extern int MetadataSyncRetryInterval; extern int MetadataSyncTransMode; +extern int MetadataSyncBatchCount; /* * MetadataSyncContext is used throughout metadata sync. @@ -39,9 +42,9 @@ typedef struct MetadataSyncContext List *activatedWorkerBareConnections; /* bare connections to activated nodes */ MemoryContext context; /* memory context for all allocations */ MetadataSyncTransactionMode transactionMode; /* transaction mode for the sync */ - bool collectCommands; /* if we collect commands instead of sending and resetting */ - List *collectedCommands; /* collected commands. (NIL if collectCommands == false) */ + List *collectedCommands; /* collected commands. */ bool nodesAddedInSameTransaction; /* if the nodes are added just before activation */ + bool noConnectionMode; /* should we establish connections or just collect commands */ } MetadataSyncContext; typedef enum @@ -139,19 +142,24 @@ extern void SyncNewColocationGroupToNodes(uint32 colocationId, int shardCount, extern void SyncDeleteColocationGroupToNodes(uint32 colocationId); extern MetadataSyncContext * CreateMetadataSyncContext(List *nodeList, - bool collectCommands, - bool nodesAddedInSameTransaction); + bool nodesAddedInSameTransaction, + bool noConnectionMode); extern void EstablishAndSetMetadataSyncBareConnections(MetadataSyncContext *context); extern void SetMetadataSyncNodesFromNodeList(MetadataSyncContext *context, List *nodeList); extern void ResetMetadataSyncMemoryContext(MetadataSyncContext *context); -extern bool MetadataSyncCollectsCommands(MetadataSyncContext *context); -extern void SendOrCollectCommandListToActivatedNodes(MetadataSyncContext *context, - List *commands); -extern void SendOrCollectCommandListToMetadataNodes(MetadataSyncContext *context, - List *commands); -extern void SendOrCollectCommandListToSingleNode(MetadataSyncContext *context, - List *commands, int nodeIdx); +extern void CollectCommandIntoMetadataSyncContext(MetadataSyncContext *context, + List *commandList); +extern void ProcessBatchCommandsToActivatedNodes(MetadataSyncContext *context, bool + forceSend); +extern void ProcessBatchCommandsToMetadataNodes(MetadataSyncContext *context, + bool forceSend); +extern void ProcessBatchCommandsToSingleNode(MetadataSyncContext *context, int nodeIdx, + bool forceSend); +extern bool MetadataSyncInNoConnectionMode(MetadataSyncContext *context); +extern void SendCollectedCommandsToActivatedNodes(MetadataSyncContext *context); +extern void SendCollectedCommandsToMetadataNodes(MetadataSyncContext *context); +extern void SendCollectedCommandsToSingleNode(MetadataSyncContext *context, int nodeIdx); extern void ActivateNodeList(MetadataSyncContext *context);