PR #6728  / commit - 2

- Create MetadataSyncContext api to encapsulate
  both transactional and nontransactional modes,
- Add a GUC to switch between metadata sync transaction modes.
pull/6728/head
aykutbozkurt 2023-03-28 13:01:45 +03:00
parent 98abd68178
commit 85d50203d1
3 changed files with 385 additions and 0 deletions

View File

@ -90,6 +90,7 @@
/* managed via a GUC */
char *EnableManualMetadataChangesForUser = "";
int MetadataSyncTransMode = METADATA_SYNC_TRANSACTIONAL;
static void EnsureObjectMetadataIsSane(int distributionArgumentIndex,
@ -4157,3 +4158,330 @@ ColocationGroupCreateCommandList(void)
return list_make1(colocationGroupCreateCommand->data);
}
/*
* SetMetadataSyncNodesFromNodeList sets list of nodes that needs to be metadata
* synced among given node list into metadataSyncContext.
*/
void
SetMetadataSyncNodesFromNodeList(MetadataSyncContext *context, List *nodeList)
{
List *activatedWorkerNodeList = NIL;
WorkerNode *node = NULL;
foreach_ptr(node, nodeList)
{
if (EnableMetadataSync && NodeIsPrimary(node))
{
/* warn if we have coordinator in nodelist */
if (NodeIsCoordinator(node))
{
ereport(NOTICE, (errmsg("%s:%d is the coordinator and already contains "
"metadata, skipping syncing the metadata",
node->workerName, node->workerPort)));
continue;
}
activatedWorkerNodeList = lappend(activatedWorkerNodeList, node);
}
}
context->activatedWorkerNodeList = activatedWorkerNodeList;
}
/*
* EstablishAndSetMetadataSyncBareConnections establishes and sets
* connections used throughout nontransactional metadata sync.
*/
void
EstablishAndSetMetadataSyncBareConnections(MetadataSyncContext *context)
{
Assert(MetadataSyncTransMode == METADATA_SYNC_NON_TRANSACTIONAL);
int connectionFlags = REQUIRE_METADATA_CONNECTION;
/* establish bare connections to activated worker nodes */
List *bareConnectionList = NIL;
WorkerNode *node = NULL;
foreach_ptr(node, context->activatedWorkerNodeList)
{
MultiConnection *connection = GetNodeUserDatabaseConnection(connectionFlags,
node->workerName,
node->workerPort,
CurrentUserName(),
NULL);
Assert(connection != NULL);
bareConnectionList = lappend(bareConnectionList, connection);
}
context->activatedWorkerConnections = bareConnectionList;
}
/*
* EstablishAndSetMetadataSyncCoordinatedConnections establishes and sets
* connections used throughout transactional metadata sync.
*/
void
EstablishAndSetMetadataSyncCoordinatedConnections(MetadataSyncContext *context)
{
Assert(MetadataSyncTransMode == METADATA_SYNC_TRANSACTIONAL);
int connectionFlags = REQUIRE_METADATA_CONNECTION;
/* establish coordinated connections to activated worker nodes */
List *coordinatedConnectionList = NIL;
WorkerNode *node = NULL;
foreach_ptr(node, context->activatedWorkerNodeList)
{
MultiConnection *connection =
StartNodeConnection(connectionFlags, node->workerName, node->workerPort);
MarkRemoteTransactionCritical(connection);
Assert(connection != NULL);
coordinatedConnectionList = lappend(coordinatedConnectionList, connection);
}
context->activatedWorkerConnections = coordinatedConnectionList;
}
/*
* CreateMetadataSyncContext creates a context which contains worker connections
* and a MemoryContext to be used throughout the metadata sync.
*
* If we collect commands, connections will not be established as caller's intent
* is to collcet sync commands.
*/
MetadataSyncContext *
CreateMetadataSyncContext(List *nodeList, bool collectCommands)
{
/* should be alive during local transaction during the sync */
MemoryContext context = AllocSetContextCreate(TopTransactionContext,
"metadata_sync_context",
ALLOCSET_DEFAULT_SIZES);
MetadataSyncContext *metadataSyncContext = (MetadataSyncContext *) palloc0(
sizeof(MetadataSyncContext));
metadataSyncContext->context = context;
metadataSyncContext->transactionMode = MetadataSyncTransMode;
metadataSyncContext->collectCommands = collectCommands;
metadataSyncContext->collectedCommands = NIL;
/* filter the nodes that needs to be activated from given node list */
SetMetadataSyncNodesFromNodeList(metadataSyncContext, nodeList);
/* establish connections */
if (!collectCommands && MetadataSyncTransMode == METADATA_SYNC_TRANSACTIONAL)
{
EstablishAndSetMetadataSyncCoordinatedConnections(metadataSyncContext);
}
else if (!collectCommands && MetadataSyncTransMode == METADATA_SYNC_NON_TRANSACTIONAL)
{
EstablishAndSetMetadataSyncBareConnections(metadataSyncContext);
}
/* use 2PC coordinated transactions if we operate in transactional mode */
if (MetadataSyncTransMode == METADATA_SYNC_TRANSACTIONAL)
{
Use2PCForCoordinatedTransaction();
}
return metadataSyncContext;
}
/*
* DestroyMetadataSyncContext destroys the memory context inside metadataSyncContext
* and also closes open connections if any.
*/
void
DestroyMetadataSyncContext(MetadataSyncContext *context)
{
/* close connections */
MultiConnection *connection = NULL;
foreach_ptr(connection, context->activatedWorkerBareConnections)
{
CloseConnection(connection);
}
/* delete memory context */
MemoryContextDelete(context->context);
}
/*
* ResetMetadataSyncMemoryContext resets memory context inside metadataSyncContext, if
* we are not collecting commands.
*/
void
ResetMetadataSyncMemoryContext(MetadataSyncContext *context)
{
if (!MetadataSyncCollectsCommands(context))
{
MemoryContextReset(context->context);
}
}
/*
* MetadataSyncCollectsCommands returns whether context is used for collecting
* commands instead of sending them to workers.
*/
bool
MetadataSyncCollectsCommands(MetadataSyncContext *context)
{
return context->collectCommands;
}
/*
* SendOrCollectCommandListToActivatedNodes sends the commands to the activated nodes with
* bare connections inside metadatacontext or via coordinated connections.
* Note that when context only collects commands, we add commands into the context
* without sending the commands.
*/
void
SendOrCollectCommandListToActivatedNodes(MetadataSyncContext *context, List *commands)
{
/* do nothing if no commands */
if (commands == NIL)
{
return;
}
/*
* do not send any command to workers if we collect commands.
* Collect commands into metadataSyncContext's collected command
* list.
*/
if (MetadataSyncCollectsCommands(context))
{
context->collectedCommands = list_concat(context->collectedCommands,
commands);
return;
}
/* send commands to new workers, the current user should be a superuser */
Assert(superuser());
if (context->transactionMode == METADATA_SYNC_TRANSACTIONAL)
{
List *workerNodes = context->activatedWorkerNodeList;
SendMetadataCommandListToWorkerListInCoordinatedTransaction(workerNodes,
CurrentUserName(),
commands);
}
else if (context->transactionMode == METADATA_SYNC_NON_TRANSACTIONAL)
{
List *workerConnections = context->activatedWorkerBareConnections;
SendCommandListToWorkerListWithBareConnections(workerConnections, commands);
}
else
{
pg_unreachable();
}
}
/*
* SendOrCollectCommandListToMetadataNodes sends the commands to the metadata nodes with
* bare connections inside metadatacontext or via coordinated connections.
* Note that when context only collects commands, we add commands into the context
* without sending the commands.
*/
void
SendOrCollectCommandListToMetadataNodes(MetadataSyncContext *context, List *commands)
{
/*
* do not send any command to workers if we collcet commands.
* Collect commands into metadataSyncContext's collected command
* list.
*/
if (MetadataSyncCollectsCommands(context))
{
context->collectedCommands = list_concat(context->collectedCommands,
commands);
return;
}
/* send commands to new workers, the current user should be a superuser */
Assert(superuser());
if (context->transactionMode == METADATA_SYNC_TRANSACTIONAL)
{
List *metadataNodes = TargetWorkerSetNodeList(NON_COORDINATOR_METADATA_NODES,
RowShareLock);
SendMetadataCommandListToWorkerListInCoordinatedTransaction(metadataNodes,
CurrentUserName(),
commands);
}
else if (context->transactionMode == METADATA_SYNC_NON_TRANSACTIONAL)
{
SendBareCommandListToMetadataWorkers(commands);
}
else
{
pg_unreachable();
}
}
/*
* SendOrCollectCommandListToSingleNode sends the commands to the specific worker
* indexed by nodeIdx with bare connection inside metadatacontext or via coordinated
* connection. Note that when context only collects commands, we add commands into
* the context without sending the commands.
*/
void
SendOrCollectCommandListToSingleNode(MetadataSyncContext *context, List *commands,
int nodeIdx)
{
/*
* Do not send any command to workers if we collect commands.
* Collect commands into metadataSyncContext's collected command
* list.
*/
if (MetadataSyncCollectsCommands(context))
{
context->collectedCommands = list_concat(context->collectedCommands,
commands);
return;
}
/* send commands to new workers, the current user should be a superuser */
Assert(superuser());
List *workerConnections = context->activatedWorkerConnections;
Assert(nodeIdx < list_length(workerConnections));
MultiConnection *workerConnection = list_nth(workerConnections, nodeIdx);
if (context->transactionMode == METADATA_SYNC_TRANSACTIONAL)
{
List *workerNodes = context->activatedWorkerNodeList;
Assert(nodeIdx < list_length(workerNodes));
WorkerNode *node = list_nth(workerNodes, nodeIdx);
SendMetadataCommandListToWorkerListInCoordinatedTransaction(list_make1(node),
CurrentUserName(),
commands);
}
else if (context->transactionMode == METADATA_SYNC_NON_TRANSACTIONAL)
{
List *workerConnections = context->activatedWorkerBareConnections;
Assert(nodeIdx < list_length(workerConnections));
MultiConnection *workerConnection = list_nth(workerConnections, nodeIdx);
List *connectionList = list_make1(workerConnection);
SendCommandListToWorkerListWithBareConnections(connectionList, commands);
}
else
{
pg_unreachable();
}
}

View File

@ -360,6 +360,11 @@ static const struct config_enum_entry cpu_priority_options[] = {
{ NULL, 0, false}
};
static const struct config_enum_entry metadata_sync_mode_options[] = {
{ "transactional", METADATA_SYNC_TRANSACTIONAL, false },
{ "nontransactional", METADATA_SYNC_NON_TRANSACTIONAL, false },
{ NULL, 0, false }
};
/* *INDENT-ON* */
@ -1880,6 +1885,21 @@ RegisterCitusConfigVariables(void)
GUC_UNIT_MS | GUC_NO_SHOW_ALL,
NULL, NULL, NULL);
DefineCustomEnumVariable(
"citus.metadata_sync_mode",
gettext_noop("Sets transaction mode for metadata syncs."),
gettext_noop("metadata sync can be run inside a single coordinated "
"transaction or with multiple small transactions in "
"idempotent way. By default we sync metadata in single "
"coordinated transaction. When we hit memory problems "
"at workers, we have alternative nontransactional mode "
"where we send each command with separate transaction."),
&MetadataSyncTransMode,
METADATA_SYNC_TRANSACTIONAL, metadata_sync_mode_options,
PGC_SUSET,
GUC_SUPERUSER_ONLY | GUC_NO_SHOW_ALL,
NULL, NULL, NULL);
DefineCustomIntVariable(
"citus.metadata_sync_retry_interval",
gettext_noop("Sets the interval to retry failed metadata syncs."),

View File

@ -18,9 +18,30 @@
#include "distributed/metadata_cache.h"
#include "nodes/pg_list.h"
/* managed via guc.c */
typedef enum
{
METADATA_SYNC_TRANSACTIONAL = 0,
METADATA_SYNC_NON_TRANSACTIONAL = 1
} MetadataSyncTransactionMode;
/* config variables */
extern int MetadataSyncInterval;
extern int MetadataSyncRetryInterval;
extern int MetadataSyncTransMode;
/*
* MetadataSyncContext is used throughout metadata sync.
*/
typedef struct MetadataSyncContext
{
List *activatedWorkerNodeList; /* activated worker nodes */
List *activatedWorkerConnections; /* connections to activated worker nodes */
MemoryContext context; /* memory context for all allocations */
MetadataSyncTransactionMode transactionMode; /* transaction mode for the sync */
bool collectCommands; /* flag to collect commands instead of sending and resetting */
List *collectedCommands; /* collected commands. (NIL if collectCommands == false) */
} MetadataSyncContext;
typedef enum
{
@ -116,6 +137,22 @@ extern void SyncNewColocationGroupToNodes(uint32 colocationId, int shardCount,
Oid distributionColumnCollation);
extern void SyncDeleteColocationGroupToNodes(uint32 colocationId);
extern MetadataSyncContext * CreateMetadataSyncContext(List *nodeList, bool testMode);
extern void DestroyMetadataSyncContext(MetadataSyncContext *context);
extern void EstablishAndSetMetadataSyncBareConnections(MetadataSyncContext *context);
extern void EstablishAndSetMetadataSyncCoordinatedConnections(
MetadataSyncContext *context);
extern void SetMetadataSyncNodesFromNodeList(MetadataSyncContext *context,
List *nodeList);
extern void ResetMetadataSyncMemoryContext(MetadataSyncContext *context);
extern bool MetadataSyncCollectsCommands(MetadataSyncContext *context);
extern void SendOrCollectCommandListToActivatedNodes(MetadataSyncContext *context,
List *commands);
extern void SendOrCollectCommandListToMetadataNodes(MetadataSyncContext *context,
List *commands);
extern void SendOrCollectCommandListToSingleNode(MetadataSyncContext *context,
List *commands, int nodeIdx);
#define DELETE_ALL_NODES "DELETE FROM pg_dist_node"
#define DELETE_ALL_PLACEMENTS "DELETE FROM pg_dist_placement"
#define DELETE_ALL_SHARDS "DELETE FROM pg_dist_shard"