Metadata sync api supports batching commands.

metadata-command-batch
aykutbozkurt 2023-03-30 11:40:45 +03:00
parent 104e85e18f
commit cee759d66d
6 changed files with 358 additions and 182 deletions

View File

@ -36,16 +36,18 @@
#include "catalog/pg_proc.h"
#include "catalog/pg_type.h"
#include "commands/async.h"
#include "distributed/adaptive_executor.h"
#include "distributed/argutils.h"
#include "distributed/backend_data.h"
#include "distributed/citus_ruleutils.h"
#include "distributed/colocation_utils.h"
#include "distributed/commands.h"
#include "distributed/coordinator_protocol.h"
#include "distributed/deparser.h"
#include "distributed/deparse_shard_query.h"
#include "distributed/distribution_column.h"
#include "distributed/listutils.h"
#include "distributed/metadata_utility.h"
#include "distributed/coordinator_protocol.h"
#include "distributed/maintenanced.h"
#include "distributed/metadata_cache.h"
#include "distributed/metadata_sync.h"
@ -91,6 +93,7 @@
/* managed via a GUC */
char *EnableManualMetadataChangesForUser = "";
int MetadataSyncTransMode = METADATA_SYNC_TRANSACTIONAL;
int MetadataSyncBatchCount = METADATA_SYNC_DEFAULT_BATCH_COUNT;
static void EnsureObjectMetadataIsSane(int distributionArgumentIndex,
@ -146,6 +149,8 @@ static char * ColocationGroupCreateCommand(uint32 colocationId, int shardCount,
static char * ColocationGroupDeleteCommand(uint32 colocationId);
static char * RemoteTypeIdExpression(Oid typeId);
static char * RemoteCollationIdExpression(Oid colocationId);
static bool ExceedsSyncBatchCount(MetadataSyncContext *context);
static void ExecuteCommandListOnWorkerList(List *commandList, List *workerList);
PG_FUNCTION_INFO_V1(start_metadata_sync_to_all_nodes);
@ -201,11 +206,11 @@ start_metadata_sync_to_node(PG_FUNCTION_ARGS)
* It contains activated nodes, bare connections if the mode is nontransactional,
* and a memory context for allocation.
*/
bool collectCommands = false;
bool nodesAddedInSameTransaction = false;
bool noConnectionMode = false;
MetadataSyncContext *context = CreateMetadataSyncContext(list_make1(workerNode),
collectCommands,
nodesAddedInSameTransaction);
nodesAddedInSameTransaction,
noConnectionMode);
ActivateNodeList(context);
TransactionModifiedNodeMetadata = true;
@ -234,11 +239,11 @@ start_metadata_sync_to_all_nodes(PG_FUNCTION_ARGS)
* It contains activated nodes, bare connections if the mode is nontransactional,
* and a memory context for allocation.
*/
bool collectCommands = false;
bool nodesAddedInSameTransaction = false;
bool noConnectionMode = false;
MetadataSyncContext *context = CreateMetadataSyncContext(nodeList,
collectCommands,
nodesAddedInSameTransaction);
nodesAddedInSameTransaction,
noConnectionMode);
ActivateNodeList(context);
TransactionModifiedNodeMetadata = true;
@ -4008,15 +4013,15 @@ EstablishAndSetMetadataSyncBareConnections(MetadataSyncContext *context)
* CreateMetadataSyncContext creates a context which contains worker connections
* and a MemoryContext to be used throughout the metadata sync.
*
* If we collect commands, connections will not be established as caller's intent
* is to collect sync commands.
* If noConnectionMode is set, we do not establish connection. Instead, we just
* collect commands.
*
* If the nodes are newly added before activation, we would not try to unset
* metadatasynced in separate transaction during nontransactional metadatasync.
*/
MetadataSyncContext *
CreateMetadataSyncContext(List *nodeList, bool collectCommands,
bool nodesAddedInSameTransaction)
CreateMetadataSyncContext(List *nodeList, bool nodesAddedInSameTransaction,
bool noConnectionMode)
{
/* should be alive during local transaction during the sync */
MemoryContext context = AllocSetContextCreate(TopTransactionContext,
@ -4028,18 +4033,18 @@ CreateMetadataSyncContext(List *nodeList, bool collectCommands,
metadataSyncContext->context = context;
metadataSyncContext->transactionMode = MetadataSyncTransMode;
metadataSyncContext->collectCommands = collectCommands;
metadataSyncContext->collectedCommands = NIL;
metadataSyncContext->nodesAddedInSameTransaction = nodesAddedInSameTransaction;
metadataSyncContext->noConnectionMode = noConnectionMode;
/* filter the nodes that needs to be activated from given node list */
SetMetadataSyncNodesFromNodeList(metadataSyncContext, nodeList);
/*
* establish connections only for nontransactional mode to prevent connection
* open-close for each command
* Establish connections only for nontransactional mode to prevent connection
* open-close for each command.
*/
if (!collectCommands && MetadataSyncTransMode == METADATA_SYNC_NON_TRANSACTIONAL)
if (!noConnectionMode && MetadataSyncTransMode == METADATA_SYNC_NON_TRANSACTIONAL)
{
EstablishAndSetMetadataSyncBareConnections(metadataSyncContext);
}
@ -4055,53 +4060,160 @@ CreateMetadataSyncContext(List *nodeList, bool collectCommands,
/*
* ResetMetadataSyncMemoryContext resets memory context inside metadataSyncContext, if
* we are not collecting commands.
* ExceedsSyncBatchCount returns whether given context accumulated
* more than METADATA_SYNC_DEFAULT_BATCH_COUNT.
*/
void
ResetMetadataSyncMemoryContext(MetadataSyncContext *context)
static bool
ExceedsSyncBatchCount(MetadataSyncContext *context)
{
if (!MetadataSyncCollectsCommands(context))
int collectedCommandCount = list_length(context->collectedCommands);
return collectedCommandCount > METADATA_SYNC_DEFAULT_BATCH_COUNT;
}
/*
* ExecuteCommandListOnWorkerList executes a command list on all nodes
* in the given list.
*/
static void
ExecuteCommandListOnWorkerList(List *commandList, List *workerList)
{
List *taskList = NIL;
WorkerNode *workerNode = NULL;
foreach_ptr(workerNode, workerList)
{
MemoryContextReset(context->context);
ShardPlacement *placement = CitusMakeNode(ShardPlacement);
SetPlacementNodeMetadata(placement, workerNode);
Task *task = CitusMakeNode(Task);
task->taskType = DDL_TASK;
SetTaskQueryStringList(task, commandList);
task->taskPlacementList = list_make1(placement);
taskList = lappend(taskList, task);
}
bool localExecutionSupported = false;
ExecuteUtilityTaskList(taskList, localExecutionSupported);
}
/*
* MetadataSyncCollectsCommands returns whether context is used for collecting
* commands instead of sending them to workers.
*/
bool
MetadataSyncCollectsCommands(MetadataSyncContext *context)
{
return context->collectCommands;
}
/*
* SendOrCollectCommandListToActivatedNodes sends the commands to the activated nodes with
* bare connections inside metadatacontext or via coordinated connections.
* Note that when context only collects commands, we add commands into the context
* without sending the commands.
* ProcessBatchCommandsToActivatedNodes processes collected commands inside
* metadataSyncContext. Sends commands to activated workers if batch limit
* is exceeded or user forces. If noConnectionMode is set, it would not process
* the commands.
*/
void
SendOrCollectCommandListToActivatedNodes(MetadataSyncContext *context, List *commands)
ProcessBatchCommandsToActivatedNodes(MetadataSyncContext *context, bool forceSend)
{
/* do nothing if no commands */
if (commands == NIL)
/* do not send and reset commands if we are in noConnectionMode */
if (MetadataSyncInNoConnectionMode(context))
{
return;
}
/*
* do not send any command to workers if we collect commands.
* Collect commands into metadataSyncContext's collected command
* list.
*/
if (MetadataSyncCollectsCommands(context))
if (forceSend || ExceedsSyncBatchCount(context))
{
SendCollectedCommandsToActivatedNodes(context);
ResetMetadataSyncMemoryContext(context);
}
}
/*
* ProcessBatchCommandsToSingleNode does the same operation as
* ProcessBatchCommandsToActivatedNodes but for metadata nodes.
*/
void
ProcessBatchCommandsToMetadataNodes(MetadataSyncContext *context, bool forceSend)
{
/* do not send and reset commands if we are in noConnectionMode */
if (MetadataSyncInNoConnectionMode(context))
{
return;
}
if (forceSend || ExceedsSyncBatchCount(context))
{
SendCollectedCommandsToMetadataNodes(context);
ResetMetadataSyncMemoryContext(context);
}
}
/*
* ProcessBatchCommandsToSingleNode does the same as operation
* ProcessBatchCommandsToActivatedNodes but for only given node.
*/
void
ProcessBatchCommandsToSingleNode(MetadataSyncContext *context, int nodeIdx,
bool forceSend)
{
/* do not send and reset commands if we are in noConnectionMode */
if (MetadataSyncInNoConnectionMode(context))
{
return;
}
if (forceSend || ExceedsSyncBatchCount(context))
{
SendCollectedCommandsToSingleNode(context, nodeIdx);
ResetMetadataSyncMemoryContext(context);
}
}
/*
* ResetMetadataSyncMemoryContext resets memory context inside metadataSyncContext.
*/
void
ResetMetadataSyncMemoryContext(MetadataSyncContext *context)
{
Assert(!MetadataSyncInNoConnectionMode(context));
MemoryContextReset(context->context);
context->collectedCommands = NIL;
}
/*
* CollectCommandIntoMetadataSyncContext collcets given commands into metadataSyncContext.
*/
void
CollectCommandIntoMetadataSyncContext(MetadataSyncContext *context, List *commandList)
{
context->collectedCommands = list_concat(context->collectedCommands, commandList);
}
/*
* MetadataSyncInNoConnectionMode returns whether context is used for collecting
* commands instead of sending them to workers.
*/
bool
MetadataSyncInNoConnectionMode(MetadataSyncContext *context)
{
return context->noConnectionMode;
}
/*
* SendCollectedCommandsToActivatedNodes sends collected commands to the activated
* nodes with bare connections inside metadatacontext or via coordinated connections.
* Note that when we are in noConnectionMode, we add commands into the context without
* sending the commands.
*/
void
SendCollectedCommandsToActivatedNodes(MetadataSyncContext *context)
{
Assert(!MetadataSyncInNoConnectionMode(context));
/* do nothing if no commands */
List *commands = context->collectedCommands;
if (commands == NIL)
{
context->collectedCommands = list_concat(context->collectedCommands, commands);
return;
}
@ -4111,9 +4223,11 @@ SendOrCollectCommandListToActivatedNodes(MetadataSyncContext *context, List *com
if (context->transactionMode == METADATA_SYNC_TRANSACTIONAL)
{
List *workerNodes = context->activatedWorkerNodeList;
SendMetadataCommandListToWorkerListInCoordinatedTransaction(workerNodes,
CurrentUserName(),
commands);
ExecuteCommandListOnWorkerList(commands, workerNodes);
/* SendMetadataCommandListToWorkerListInCoordinatedTransaction(workerNodes, */
/* CurrentUserName(), */
/* commands); */
}
else if (context->transactionMode == METADATA_SYNC_NON_TRANSACTIONAL)
{
@ -4128,22 +4242,20 @@ SendOrCollectCommandListToActivatedNodes(MetadataSyncContext *context, List *com
/*
* SendOrCollectCommandListToMetadataNodes sends the commands to the metadata nodes with
* bare connections inside metadatacontext or via coordinated connections.
* Note that when context only collects commands, we add commands into the context
* without sending the commands.
* SendCollectedCommandsToMetadataNodes sends collected commands to the metadata
* nodes with bare connections inside metadatacontext or via coordinated connections.
* Note that when we are in noConnectionMode, we add commands into the context without
* sending the commands.
*/
void
SendOrCollectCommandListToMetadataNodes(MetadataSyncContext *context, List *commands)
SendCollectedCommandsToMetadataNodes(MetadataSyncContext *context)
{
/*
* do not send any command to workers if we collcet commands.
* Collect commands into metadataSyncContext's collected command
* list.
*/
if (MetadataSyncCollectsCommands(context))
Assert(!MetadataSyncInNoConnectionMode(context));
/* do nothing if no commands */
List *commands = context->collectedCommands;
if (commands == NIL)
{
context->collectedCommands = list_concat(context->collectedCommands, commands);
return;
}
@ -4154,9 +4266,11 @@ SendOrCollectCommandListToMetadataNodes(MetadataSyncContext *context, List *comm
{
List *metadataNodes = TargetWorkerSetNodeList(NON_COORDINATOR_METADATA_NODES,
RowShareLock);
SendMetadataCommandListToWorkerListInCoordinatedTransaction(metadataNodes,
CurrentUserName(),
commands);
ExecuteCommandListOnWorkerList(commands, metadataNodes);
/* SendMetadataCommandListToWorkerListInCoordinatedTransaction(metadataNodes, */
/* CurrentUserName(), */
/* commands); */
}
else if (context->transactionMode == METADATA_SYNC_NON_TRANSACTIONAL)
{
@ -4170,23 +4284,20 @@ SendOrCollectCommandListToMetadataNodes(MetadataSyncContext *context, List *comm
/*
* SendOrCollectCommandListToSingleNode sends the commands to the specific worker
* SendCollectedCommandsToSingleNode sends collected commands to the specific worker
* indexed by nodeIdx with bare connection inside metadatacontext or via coordinated
* connection. Note that when context only collects commands, we add commands into
* connection. Note that when we are in noConnectionMode, we add commands into
* the context without sending the commands.
*/
void
SendOrCollectCommandListToSingleNode(MetadataSyncContext *context, List *commands,
int nodeIdx)
SendCollectedCommandsToSingleNode(MetadataSyncContext *context, int nodeIdx)
{
/*
* Do not send any command to workers if we collect commands.
* Collect commands into metadataSyncContext's collected command
* list.
*/
if (MetadataSyncCollectsCommands(context))
Assert(!MetadataSyncInNoConnectionMode(context));
/* do nothing if no commands */
List *commands = context->collectedCommands;
if (commands == NIL)
{
context->collectedCommands = list_concat(context->collectedCommands, commands);
return;
}
@ -4199,9 +4310,11 @@ SendOrCollectCommandListToSingleNode(MetadataSyncContext *context, List *command
Assert(nodeIdx < list_length(workerNodes));
WorkerNode *node = list_nth(workerNodes, nodeIdx);
SendMetadataCommandListToWorkerListInCoordinatedTransaction(list_make1(node),
CurrentUserName(),
commands);
ExecuteCommandListOnWorkerList(commands, list_make1(node));
/* SendMetadataCommandListToWorkerListInCoordinatedTransaction(list_make1(node), */
/* CurrentUserName(), */
/* commands); */
}
else if (context->transactionMode == METADATA_SYNC_NON_TRANSACTIONAL)
{
@ -4243,7 +4356,6 @@ WorkerDropAllShellTablesCommand(bool singleTransaction)
static List *
PropagateNodeWideObjectsCommandList(void)
{
/* collect all commands */
List *ddlCommands = NIL;
if (EnableAlterRoleSetPropagation)
@ -4330,17 +4442,24 @@ SyncDistributedObjects(MetadataSyncContext *context)
void
SendNodeWideObjectsSyncCommands(MetadataSyncContext *context)
{
MemoryContext oldContext = MemoryContextSwitchTo(context->context);
/* propagate node wide objects. It includes only roles for now. */
List *commandList = PropagateNodeWideObjectsCommandList();
if (commandList == NIL)
{
MemoryContextSwitchTo(oldContext);
return;
}
commandList = lcons(DISABLE_DDL_PROPAGATION, commandList);
commandList = lappend(commandList, ENABLE_DDL_PROPAGATION);
SendOrCollectCommandListToActivatedNodes(context, commandList);
bool forceSend = true;
CollectCommandIntoMetadataSyncContext(context, commandList);
ProcessBatchCommandsToActivatedNodes(context, forceSend);
MemoryContextSwitchTo(oldContext);
}
@ -4352,14 +4471,22 @@ SendNodeWideObjectsSyncCommands(MetadataSyncContext *context)
void
SendShellTableDeletionCommands(MetadataSyncContext *context)
{
bool forceSend = true;
MemoryContext oldContext = MemoryContextSwitchTo(context->context);
/* break all sequence deps for citus tables and remove all shell tables */
char *breakSeqDepsCommand = BREAK_CITUS_TABLE_SEQUENCE_DEPENDENCY_COMMAND;
SendOrCollectCommandListToActivatedNodes(context, list_make1(breakSeqDepsCommand));
CollectCommandIntoMetadataSyncContext(context, list_make1(breakSeqDepsCommand));
ProcessBatchCommandsToActivatedNodes(context, forceSend);
/* remove shell tables */
bool singleTransaction = (context->transactionMode == METADATA_SYNC_TRANSACTIONAL);
char *dropShellTablesCommand = WorkerDropAllShellTablesCommand(singleTransaction);
SendOrCollectCommandListToActivatedNodes(context, list_make1(dropShellTablesCommand));
CollectCommandIntoMetadataSyncContext(context, list_make1(dropShellTablesCommand));
ProcessBatchCommandsToActivatedNodes(context, forceSend);
MemoryContextSwitchTo(oldContext);
}
@ -4371,21 +4498,27 @@ SendShellTableDeletionCommands(MetadataSyncContext *context)
void
SendMetadataDeletionCommands(MetadataSyncContext *context)
{
MemoryContext oldContext = MemoryContextSwitchTo(context->context);
/* remove pg_dist_partition entries */
SendOrCollectCommandListToActivatedNodes(context, list_make1(DELETE_ALL_PARTITIONS));
CollectCommandIntoMetadataSyncContext(context, list_make1(DELETE_ALL_PARTITIONS));
/* remove pg_dist_shard entries */
SendOrCollectCommandListToActivatedNodes(context, list_make1(DELETE_ALL_SHARDS));
CollectCommandIntoMetadataSyncContext(context, list_make1(DELETE_ALL_SHARDS));
/* remove pg_dist_placement entries */
SendOrCollectCommandListToActivatedNodes(context, list_make1(DELETE_ALL_PLACEMENTS));
CollectCommandIntoMetadataSyncContext(context, list_make1(DELETE_ALL_PLACEMENTS));
/* remove pg_dist_object entries */
SendOrCollectCommandListToActivatedNodes(context,
list_make1(DELETE_ALL_DISTRIBUTED_OBJECTS));
CollectCommandIntoMetadataSyncContext(context,
list_make1(DELETE_ALL_DISTRIBUTED_OBJECTS));
/* remove pg_dist_colocation entries */
SendOrCollectCommandListToActivatedNodes(context, list_make1(DELETE_ALL_COLOCATION));
CollectCommandIntoMetadataSyncContext(context, list_make1(DELETE_ALL_COLOCATION));
bool forceSend = true;
ProcessBatchCommandsToActivatedNodes(context, forceSend);
MemoryContextSwitchTo(oldContext);
}
@ -4396,6 +4529,7 @@ SendMetadataDeletionCommands(MetadataSyncContext *context)
void
SendColocationMetadataCommands(MetadataSyncContext *context)
{
bool forceSend = false;
ScanKeyData scanKey[1];
int scanKeyCount = 0;
@ -4407,11 +4541,12 @@ SendColocationMetadataCommands(MetadataSyncContext *context)
HeapTuple nextTuple = NULL;
while (true)
{
ResetMetadataSyncMemoryContext(context);
nextTuple = systable_getnext(scanDesc);
if (!HeapTupleIsValid(nextTuple))
{
/* send if we have commands that are not sent */
forceSend = true;
ProcessBatchCommandsToActivatedNodes(context, forceSend);
break;
}
@ -4476,7 +4611,8 @@ SendColocationMetadataCommands(MetadataSyncContext *context)
" = c.collnamespace)");
List *commandList = list_make1(colocationGroupCreateCommand->data);
SendOrCollectCommandListToActivatedNodes(context, commandList);
CollectCommandIntoMetadataSyncContext(context, commandList);
ProcessBatchCommandsToActivatedNodes(context, forceSend);
}
MemoryContextSwitchTo(oldContext);
@ -4493,11 +4629,14 @@ SendColocationMetadataCommands(MetadataSyncContext *context)
void
SendDependencyCreationCommands(MetadataSyncContext *context)
{
/* disable ddl propagation */
SendOrCollectCommandListToActivatedNodes(context,
list_make1(DISABLE_DDL_PROPAGATION));
MemoryContext oldContext = MemoryContextSwitchTo(context->context);
/*
* We need to preserve collected dependencies as we reset memory context
* inside metadataSyncContext during commands generation below.
*/
MemoryContext dependencyContext = AllocSetContextCreate(CurrentMemoryContext,
"dependency context",
ALLOCSET_DEFAULT_SIZES);
MemoryContext oldContext = MemoryContextSwitchTo(dependencyContext);
/* collect all dependencies in creation order and get their ddl commands */
List *dependencies = GetDistributedObjectAddressList();
@ -4512,22 +4651,16 @@ SendDependencyCreationCommands(MetadataSyncContext *context)
dependencies = OrderObjectAddressListInDependencyOrder(dependencies);
/*
* We need to create a subcontext as we reset the context after each dependency
* creation but we want to preserve all dependency objects at metadataSyncContext.
*/
MemoryContext commandsContext = AllocSetContextCreate(context->context,
"dependency commands context",
ALLOCSET_DEFAULT_SIZES);
MemoryContextSwitchTo(commandsContext);
MemoryContextSwitchTo(context->context);
/* disable ddl propagation */
bool forceSend = true;
CollectCommandIntoMetadataSyncContext(context, list_make1(DISABLE_DDL_PROPAGATION));
ProcessBatchCommandsToActivatedNodes(context, forceSend);
ObjectAddress *dependency = NULL;
foreach_ptr(dependency, dependencies)
{
if (!MetadataSyncCollectsCommands(context))
{
MemoryContextReset(commandsContext);
}
if (IsAnyObjectAddressOwnedByExtension(list_make1(dependency), NULL))
{
/*
@ -4538,19 +4671,18 @@ SendDependencyCreationCommands(MetadataSyncContext *context)
}
/* dependency creation commands */
forceSend = false;
List *ddlCommands = GetAllDependencyCreateDDLCommands(list_make1(dependency));
SendOrCollectCommandListToActivatedNodes(context, ddlCommands);
CollectCommandIntoMetadataSyncContext(context, ddlCommands);
ProcessBatchCommandsToActivatedNodes(context, forceSend);
}
MemoryContextSwitchTo(oldContext);
if (!MetadataSyncCollectsCommands(context))
{
MemoryContextDelete(commandsContext);
}
ResetMetadataSyncMemoryContext(context);
/* enable ddl propagation */
SendOrCollectCommandListToActivatedNodes(context, list_make1(ENABLE_DDL_PROPAGATION));
forceSend = true;
CollectCommandIntoMetadataSyncContext(context, list_make1(ENABLE_DDL_PROPAGATION));
ProcessBatchCommandsToActivatedNodes(context, forceSend);
MemoryContextSwitchTo(oldContext);
MemoryContextDelete(dependencyContext);
}
@ -4562,6 +4694,7 @@ SendDependencyCreationCommands(MetadataSyncContext *context)
void
SendDistTableMetadataCommands(MetadataSyncContext *context)
{
bool forceSend = false;
ScanKeyData scanKey[1];
int scanKeyCount = 0;
@ -4575,11 +4708,12 @@ SendDistTableMetadataCommands(MetadataSyncContext *context)
HeapTuple nextTuple = NULL;
while (true)
{
ResetMetadataSyncMemoryContext(context);
nextTuple = systable_getnext(scanDesc);
if (!HeapTupleIsValid(nextTuple))
{
/* send if we have commands that are not sent */
forceSend = true;
ProcessBatchCommandsToActivatedNodes(context, forceSend);
break;
}
@ -4594,7 +4728,8 @@ SendDistTableMetadataCommands(MetadataSyncContext *context)
}
List *commandList = CitusTableMetadataCreateCommandList(relationId);
SendOrCollectCommandListToActivatedNodes(context, commandList);
CollectCommandIntoMetadataSyncContext(context, commandList);
ProcessBatchCommandsToActivatedNodes(context, forceSend);
}
MemoryContextSwitchTo(oldContext);
@ -4611,6 +4746,7 @@ SendDistTableMetadataCommands(MetadataSyncContext *context)
void
SendDistObjectCommands(MetadataSyncContext *context)
{
bool forceSend = false;
ScanKeyData scanKey[1];
int scanKeyCount = 0;
@ -4624,11 +4760,12 @@ SendDistObjectCommands(MetadataSyncContext *context)
HeapTuple nextTuple = NULL;
while (true)
{
ResetMetadataSyncMemoryContext(context);
nextTuple = systable_getnext(scanDesc);
if (!HeapTupleIsValid(nextTuple))
{
/* send if we have commands that are not sent */
forceSend = true;
ProcessBatchCommandsToActivatedNodes(context, forceSend);
break;
}
@ -4678,13 +4815,15 @@ SendDistObjectCommands(MetadataSyncContext *context)
forceDelegation = NO_FORCE_PUSHDOWN;
}
forceSend = false;
char *workerMetadataUpdateCommand =
MarkObjectsDistributedCreateCommand(list_make1(address),
list_make1_int(distributionArgumentIndex),
list_make1_int(colocationId),
list_make1_int(forceDelegation));
SendOrCollectCommandListToActivatedNodes(context,
list_make1(workerMetadataUpdateCommand));
List *commandList = list_make1(workerMetadataUpdateCommand);
CollectCommandIntoMetadataSyncContext(context, commandList);
ProcessBatchCommandsToActivatedNodes(context, forceSend);
}
MemoryContextSwitchTo(oldContext);
@ -4702,10 +4841,6 @@ SendDistObjectCommands(MetadataSyncContext *context)
void
SendInterTableRelationshipCommands(MetadataSyncContext *context)
{
/* disable ddl propagation */
SendOrCollectCommandListToActivatedNodes(context,
list_make1(DISABLE_DDL_PROPAGATION));
ScanKeyData scanKey[1];
int scanKeyCount = 0;
@ -4716,11 +4851,15 @@ SendInterTableRelationshipCommands(MetadataSyncContext *context)
scanKeyCount, scanKey);
MemoryContext oldContext = MemoryContextSwitchTo(context->context);
/* disable ddl propagation */
bool forceSend = true;
CollectCommandIntoMetadataSyncContext(context, list_make1(DISABLE_DDL_PROPAGATION));
ProcessBatchCommandsToActivatedNodes(context, forceSend);
HeapTuple nextTuple = NULL;
while (true)
{
ResetMetadataSyncMemoryContext(context);
nextTuple = systable_getnext(scanDesc);
if (!HeapTupleIsValid(nextTuple))
{
@ -4742,14 +4881,18 @@ SendInterTableRelationshipCommands(MetadataSyncContext *context)
continue;
}
forceSend = false;
List *commandList = InterTableRelationshipOfRelationCommandList(relationId);
SendOrCollectCommandListToActivatedNodes(context, commandList);
CollectCommandIntoMetadataSyncContext(context, commandList);
ProcessBatchCommandsToActivatedNodes(context, forceSend);
}
/* enable ddl propagation */
forceSend = true;
CollectCommandIntoMetadataSyncContext(context, list_make1(ENABLE_DDL_PROPAGATION));
ProcessBatchCommandsToActivatedNodes(context, forceSend);
MemoryContextSwitchTo(oldContext);
systable_endscan(scanDesc);
table_close(relation, AccessShareLock);
/* enable ddl propagation */
SendOrCollectCommandListToActivatedNodes(context, list_make1(ENABLE_DDL_PROPAGATION));
}

View File

@ -762,11 +762,11 @@ citus_activate_node(PG_FUNCTION_ARGS)
* It contains activated nodes, bare connections if the mode is nontransactional,
* and a memory context for allocation.
*/
bool collectCommands = false;
bool nodesAddedInSameTransaction = false;
bool noConnectionMode = false;
MetadataSyncContext *context = CreateMetadataSyncContext(list_make1(workerNode),
collectCommands,
nodesAddedInSameTransaction);
nodesAddedInSameTransaction,
noConnectionMode);
ActivateNodeList(context);
TransactionModifiedNodeMetadata = true;
@ -932,7 +932,7 @@ MarkNodesNotSyncedInLoopBackConnection(MetadataSyncContext *context,
pid_t parentSessionPid)
{
Assert(context->transactionMode == METADATA_SYNC_NON_TRANSACTIONAL);
Assert(!MetadataSyncCollectsCommands(context));
Assert(!MetadataSyncInNoConnectionMode(context));
/*
* Set metadatasynced to false for all activated nodes to mark the nodes as not synced
@ -1000,8 +1000,8 @@ MarkNodesNotSyncedInLoopBackConnection(MetadataSyncContext *context,
static void
SetNodeMetadata(MetadataSyncContext *context, bool localOnly)
{
/* do not execute local transaction if we collect commands */
if (!MetadataSyncCollectsCommands(context))
/* do not execute local transaction if we are in noConnectionMode */
if (!MetadataSyncInNoConnectionMode(context))
{
List *updatedActivatedNodeList = NIL;
@ -1022,7 +1022,7 @@ SetNodeMetadata(MetadataSyncContext *context, bool localOnly)
SetMetadataSyncNodesFromNodeList(context, updatedActivatedNodeList);
}
if (!localOnly && EnableMetadataSync)
if (!localOnly)
{
WorkerNode *node = NULL;
foreach_ptr(node, context->activatedWorkerNodeList)
@ -1040,7 +1040,7 @@ SetNodeMetadata(MetadataSyncContext *context, bool localOnly)
* The function operates in 3 different modes according to transactionMode inside
* metadataSyncContext.
*
* 1. MetadataSyncCollectsCommands(context):
* 1. MetadataSyncInNoConnectionMode(context):
* Only collect commands instead of sending them to workers,
* 2. context.transactionMode == METADATA_SYNC_TRANSACTIONAL:
* Send all commands using coordinated transaction,
@ -1090,15 +1090,14 @@ ActivateNodeList(MetadataSyncContext *context)
/*
* we need to unset metadatasynced flag to false at coordinator in separate
* transaction only at nontransactional sync mode and if we do not collect
* commands.
* transaction only at nontransactional sync mode and if are in noConnectionMode.
*
* We make sure we set the flag to false at the start of nontransactional
* metadata sync to mark those nodes are not synced in case of a failure in
* the middle of the sync.
*/
if (context->transactionMode == METADATA_SYNC_NON_TRANSACTIONAL &&
!MetadataSyncCollectsCommands(context))
!MetadataSyncInNoConnectionMode(context))
{
MarkNodesNotSyncedInLoopBackConnection(context, MyProcPid);
}
@ -2201,16 +2200,19 @@ AddNodeMetadataViaMetadataContext(char *nodeName, int32 nodePort,
}
List *nodeList = list_make1(node);
bool collectCommands = false;
bool nodesAddedInSameTransaction = true;
MetadataSyncContext *context = CreateMetadataSyncContext(nodeList, collectCommands,
nodesAddedInSameTransaction);
bool noConnectionMode = false;
MetadataSyncContext *context = CreateMetadataSyncContext(nodeList,
nodesAddedInSameTransaction,
noConnectionMode);
if (EnableMetadataSync)
{
MemoryContext oldContext = MemoryContextSwitchTo(context->context);
/* send the delete command to all primary nodes with metadata */
char *nodeDeleteCommand = NodeDeleteCommand(node->nodeId);
SendOrCollectCommandListToMetadataNodes(context, list_make1(nodeDeleteCommand));
CollectCommandIntoMetadataSyncContext(context, list_make1(nodeDeleteCommand));
/* finally prepare the insert command and send it to all primary nodes */
uint32 primariesWithMetadata = CountPrimariesWithMetadata();
@ -2230,9 +2232,12 @@ AddNodeMetadataViaMetadataContext(char *nodeName, int32 nodePort,
nodeInsertCommand = NodeListIdempotentInsertCommand(nodeList);
}
Assert(nodeInsertCommand != NULL);
SendOrCollectCommandListToMetadataNodes(context,
list_make1(nodeInsertCommand));
CollectCommandIntoMetadataSyncContext(context, list_make1(nodeInsertCommand));
}
bool forceSend = true;
ProcessBatchCommandsToMetadataNodes(context, forceSend);
MemoryContextSwitchTo(oldContext);
}
ActivateNodeList(context);
@ -2273,6 +2278,8 @@ static void
SetNodeStateViaMetadataContext(MetadataSyncContext *context, WorkerNode *workerNode,
Datum value)
{
MemoryContext oldContext = MemoryContextSwitchTo(context->context);
char *isActiveCommand =
GetMetadataSyncCommandToSetNodeColumn(workerNode, Anum_pg_dist_node_isactive,
value);
@ -2285,7 +2292,10 @@ SetNodeStateViaMetadataContext(MetadataSyncContext *context, WorkerNode *workerN
List *commandList = list_make3(isActiveCommand, metadatasyncedCommand,
hasmetadataCommand);
SendOrCollectCommandListToMetadataNodes(context, commandList);
bool forceSend = true;
CollectCommandIntoMetadataSyncContext(context, commandList);
ProcessBatchCommandsToMetadataNodes(context, forceSend);
MemoryContextSwitchTo(oldContext);
}
@ -3033,18 +3043,18 @@ ErrorIfAnyNodeNotExist(List *nodeList)
static void
UpdateLocalGroupIdsViaMetadataContext(MetadataSyncContext *context)
{
MemoryContext oldContext = MemoryContextSwitchTo(context->context);
bool forceSend = true;
int activatedPrimaryCount = list_length(context->activatedWorkerNodeList);
int nodeIdx = 0;
for (nodeIdx = 0; nodeIdx < activatedPrimaryCount; nodeIdx++)
{
WorkerNode *node = list_nth(context->activatedWorkerNodeList, nodeIdx);
List *commandList = list_make1(LocalGroupIdUpdateCommand(node->groupId));
/* send commands to new workers, the current user should be a superuser */
Assert(superuser());
SendOrCollectCommandListToSingleNode(context, commandList, nodeIdx);
CollectCommandIntoMetadataSyncContext(context, commandList);
ProcessBatchCommandsToSingleNode(context, nodeIdx, forceSend);
}
MemoryContextSwitchTo(oldContext);
}
@ -3089,7 +3099,7 @@ SyncNodeMetadata(MetadataSyncContext *context)
* Do not fail when we call this method from activate_node_snapshot
* from workers.
*/
if (!MetadataSyncCollectsCommands(context))
if (!MetadataSyncInNoConnectionMode(context))
{
EnsureCoordinator();
}
@ -3099,6 +3109,8 @@ SyncNodeMetadata(MetadataSyncContext *context)
LockRelationOid(DistNodeRelationId(), ExclusiveLock);
MemoryContext oldContext = MemoryContextSwitchTo(context->context);
/* generate the queries which drop the node metadata */
List *dropMetadataCommandList = NodeMetadataDropCommands();
@ -3113,5 +3125,8 @@ SyncNodeMetadata(MetadataSyncContext *context)
* We should have already added node metadata to metadata workers. Sync node
* metadata just for activated workers.
*/
SendOrCollectCommandListToActivatedNodes(context, recreateNodeSnapshotCommandList);
bool forceSend = true;
CollectCommandIntoMetadataSyncContext(context, recreateNodeSnapshotCommandList);
ProcessBatchCommandsToActivatedNodes(context, forceSend);
MemoryContextSwitchTo(oldContext);
}

View File

@ -1873,6 +1873,17 @@ RegisterCitusConfigVariables(void)
GUC_STANDARD,
NULL, NULL, NULL);
DefineCustomIntVariable(
"citus.metadata_sync_batch_size",
gettext_noop("Sets total number of commands in a batch for metadata syncs."),
gettext_noop("metadata sync commands are sent as batches configured by "
"the batch size to reduce network round trip delay."),
&MetadataSyncBatchCount,
METADATA_SYNC_DEFAULT_BATCH_COUNT, 1, 1000,
PGC_SUSET,
GUC_SUPERUSER_ONLY | GUC_NO_SHOW_ALL,
NULL, NULL, NULL);
DefineCustomIntVariable(
"citus.metadata_sync_interval",
gettext_noop("Sets the time to wait between metadata syncs."),

View File

@ -51,14 +51,14 @@ activate_node_snapshot(PG_FUNCTION_ARGS)
/*
* Create MetadataSyncContext which is used throughout nodes' activation.
* As we set collectCommands to true, it would not create connections to workers.
* Instead it would collect and return sync commands to be sent to workers.
* As we set noConnectionMode to true, it would not establish connections to
* workers. Instead it would collect and return sync commands to be sent to workers.
*/
bool collectCommands = true;
bool nodesAddedInSameTransaction = false;
bool noConnectionMode = true;
MetadataSyncContext *context = CreateMetadataSyncContext(list_make1(dummyWorkerNode),
collectCommands,
nodesAddedInSameTransaction);
nodesAddedInSameTransaction,
noConnectionMode);
ActivateNodeList(context);

View File

@ -624,6 +624,7 @@ DeleteAllReplicatedTablePlacementsFromNodeGroupViaMetadataContext(
}
MemoryContext oldContext = MemoryContextSwitchTo(context->context);
bool forceSend = false;
GroupShardPlacement *placement = NULL;
foreach_ptr(placement, replicatedPlacementListForGroup)
{
@ -633,18 +634,16 @@ DeleteAllReplicatedTablePlacementsFromNodeGroupViaMetadataContext(
{
char *deletePlacementCommand =
DeleteShardPlacementCommand(placement->placementId);
SendOrCollectCommandListToMetadataNodes(context,
list_make1(deletePlacementCommand));
List *commandList = list_make1(deletePlacementCommand);
CollectCommandIntoMetadataSyncContext(context, commandList);
ProcessBatchCommandsToMetadataNodes(context, forceSend);
}
/* do not execute local transaction if we collect commands */
if (!MetadataSyncCollectsCommands(context))
if (!MetadataSyncInNoConnectionMode(context))
{
DeleteShardPlacementRow(placement->placementId);
}
ResetMetadataSyncMemoryContext(context);
}
MemoryContextSwitchTo(oldContext);
}

View File

@ -18,6 +18,8 @@
#include "distributed/metadata_cache.h"
#include "nodes/pg_list.h"
#define METADATA_SYNC_DEFAULT_BATCH_COUNT 10
/* managed via guc.c */
typedef enum
{
@ -29,6 +31,7 @@ typedef enum
extern int MetadataSyncInterval;
extern int MetadataSyncRetryInterval;
extern int MetadataSyncTransMode;
extern int MetadataSyncBatchCount;
/*
* MetadataSyncContext is used throughout metadata sync.
@ -39,9 +42,9 @@ typedef struct MetadataSyncContext
List *activatedWorkerBareConnections; /* bare connections to activated nodes */
MemoryContext context; /* memory context for all allocations */
MetadataSyncTransactionMode transactionMode; /* transaction mode for the sync */
bool collectCommands; /* if we collect commands instead of sending and resetting */
List *collectedCommands; /* collected commands. (NIL if collectCommands == false) */
List *collectedCommands; /* collected commands. */
bool nodesAddedInSameTransaction; /* if the nodes are added just before activation */
bool noConnectionMode; /* should we establish connections or just collect commands */
} MetadataSyncContext;
typedef enum
@ -139,19 +142,24 @@ extern void SyncNewColocationGroupToNodes(uint32 colocationId, int shardCount,
extern void SyncDeleteColocationGroupToNodes(uint32 colocationId);
extern MetadataSyncContext * CreateMetadataSyncContext(List *nodeList,
bool collectCommands,
bool nodesAddedInSameTransaction);
bool nodesAddedInSameTransaction,
bool noConnectionMode);
extern void EstablishAndSetMetadataSyncBareConnections(MetadataSyncContext *context);
extern void SetMetadataSyncNodesFromNodeList(MetadataSyncContext *context,
List *nodeList);
extern void ResetMetadataSyncMemoryContext(MetadataSyncContext *context);
extern bool MetadataSyncCollectsCommands(MetadataSyncContext *context);
extern void SendOrCollectCommandListToActivatedNodes(MetadataSyncContext *context,
List *commands);
extern void SendOrCollectCommandListToMetadataNodes(MetadataSyncContext *context,
List *commands);
extern void SendOrCollectCommandListToSingleNode(MetadataSyncContext *context,
List *commands, int nodeIdx);
extern void CollectCommandIntoMetadataSyncContext(MetadataSyncContext *context,
List *commandList);
extern void ProcessBatchCommandsToActivatedNodes(MetadataSyncContext *context, bool
forceSend);
extern void ProcessBatchCommandsToMetadataNodes(MetadataSyncContext *context,
bool forceSend);
extern void ProcessBatchCommandsToSingleNode(MetadataSyncContext *context, int nodeIdx,
bool forceSend);
extern bool MetadataSyncInNoConnectionMode(MetadataSyncContext *context);
extern void SendCollectedCommandsToActivatedNodes(MetadataSyncContext *context);
extern void SendCollectedCommandsToMetadataNodes(MetadataSyncContext *context);
extern void SendCollectedCommandsToSingleNode(MetadataSyncContext *context, int nodeIdx);
extern void ActivateNodeList(MetadataSyncContext *context);