From 85d50203d1da9ded88d8ff26dcee870ecdcceb0d Mon Sep 17 00:00:00 2001 From: aykutbozkurt Date: Tue, 28 Mar 2023 13:01:45 +0300 Subject: [PATCH] =?UTF-8?q?PR=20#6728=20=C2=A0/=20commit=20-=202?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Create MetadataSyncContext api to encapsulate both transactional and nontransactional modes, - Add a GUC to switch between metadata sync transaction modes. --- .../distributed/metadata/metadata_sync.c | 328 ++++++++++++++++++ src/backend/distributed/shared_library_init.c | 20 ++ src/include/distributed/metadata_sync.h | 37 ++ 3 files changed, 385 insertions(+) diff --git a/src/backend/distributed/metadata/metadata_sync.c b/src/backend/distributed/metadata/metadata_sync.c index 494041bea..c210bd914 100644 --- a/src/backend/distributed/metadata/metadata_sync.c +++ b/src/backend/distributed/metadata/metadata_sync.c @@ -90,6 +90,7 @@ /* managed via a GUC */ char *EnableManualMetadataChangesForUser = ""; +int MetadataSyncTransMode = METADATA_SYNC_TRANSACTIONAL; static void EnsureObjectMetadataIsSane(int distributionArgumentIndex, @@ -4157,3 +4158,330 @@ ColocationGroupCreateCommandList(void) return list_make1(colocationGroupCreateCommand->data); } + + +/* + * SetMetadataSyncNodesFromNodeList sets list of nodes that needs to be metadata + * synced among given node list into metadataSyncContext. + */ +void +SetMetadataSyncNodesFromNodeList(MetadataSyncContext *context, List *nodeList) +{ + List *activatedWorkerNodeList = NIL; + + WorkerNode *node = NULL; + foreach_ptr(node, nodeList) + { + if (EnableMetadataSync && NodeIsPrimary(node)) + { + /* warn if we have coordinator in nodelist */ + if (NodeIsCoordinator(node)) + { + ereport(NOTICE, (errmsg("%s:%d is the coordinator and already contains " + "metadata, skipping syncing the metadata", + node->workerName, node->workerPort))); + continue; + } + + activatedWorkerNodeList = lappend(activatedWorkerNodeList, node); + } + } + + context->activatedWorkerNodeList = activatedWorkerNodeList; +} + + +/* + * EstablishAndSetMetadataSyncBareConnections establishes and sets + * connections used throughout nontransactional metadata sync. + */ +void +EstablishAndSetMetadataSyncBareConnections(MetadataSyncContext *context) +{ + Assert(MetadataSyncTransMode == METADATA_SYNC_NON_TRANSACTIONAL); + + int connectionFlags = REQUIRE_METADATA_CONNECTION; + + /* establish bare connections to activated worker nodes */ + List *bareConnectionList = NIL; + WorkerNode *node = NULL; + foreach_ptr(node, context->activatedWorkerNodeList) + { + MultiConnection *connection = GetNodeUserDatabaseConnection(connectionFlags, + node->workerName, + node->workerPort, + CurrentUserName(), + NULL); + + Assert(connection != NULL); + bareConnectionList = lappend(bareConnectionList, connection); + } + + context->activatedWorkerConnections = bareConnectionList; +} + + +/* + * EstablishAndSetMetadataSyncCoordinatedConnections establishes and sets + * connections used throughout transactional metadata sync. + */ +void +EstablishAndSetMetadataSyncCoordinatedConnections(MetadataSyncContext *context) +{ + Assert(MetadataSyncTransMode == METADATA_SYNC_TRANSACTIONAL); + + int connectionFlags = REQUIRE_METADATA_CONNECTION; + + /* establish coordinated connections to activated worker nodes */ + List *coordinatedConnectionList = NIL; + WorkerNode *node = NULL; + foreach_ptr(node, context->activatedWorkerNodeList) + { + MultiConnection *connection = + StartNodeConnection(connectionFlags, node->workerName, node->workerPort); + + MarkRemoteTransactionCritical(connection); + + Assert(connection != NULL); + coordinatedConnectionList = lappend(coordinatedConnectionList, connection); + } + + context->activatedWorkerConnections = coordinatedConnectionList; +} + + +/* + * CreateMetadataSyncContext creates a context which contains worker connections + * and a MemoryContext to be used throughout the metadata sync. + * + * If we collect commands, connections will not be established as caller's intent + * is to collcet sync commands. + */ +MetadataSyncContext * +CreateMetadataSyncContext(List *nodeList, bool collectCommands) +{ + /* should be alive during local transaction during the sync */ + MemoryContext context = AllocSetContextCreate(TopTransactionContext, + "metadata_sync_context", + ALLOCSET_DEFAULT_SIZES); + + MetadataSyncContext *metadataSyncContext = (MetadataSyncContext *) palloc0( + sizeof(MetadataSyncContext)); + + metadataSyncContext->context = context; + metadataSyncContext->transactionMode = MetadataSyncTransMode; + metadataSyncContext->collectCommands = collectCommands; + metadataSyncContext->collectedCommands = NIL; + + /* filter the nodes that needs to be activated from given node list */ + SetMetadataSyncNodesFromNodeList(metadataSyncContext, nodeList); + + /* establish connections */ + if (!collectCommands && MetadataSyncTransMode == METADATA_SYNC_TRANSACTIONAL) + { + EstablishAndSetMetadataSyncCoordinatedConnections(metadataSyncContext); + } + else if (!collectCommands && MetadataSyncTransMode == METADATA_SYNC_NON_TRANSACTIONAL) + { + EstablishAndSetMetadataSyncBareConnections(metadataSyncContext); + } + + /* use 2PC coordinated transactions if we operate in transactional mode */ + if (MetadataSyncTransMode == METADATA_SYNC_TRANSACTIONAL) + { + Use2PCForCoordinatedTransaction(); + } + + return metadataSyncContext; +} + + +/* + * DestroyMetadataSyncContext destroys the memory context inside metadataSyncContext + * and also closes open connections if any. + */ +void +DestroyMetadataSyncContext(MetadataSyncContext *context) +{ + /* close connections */ + MultiConnection *connection = NULL; + foreach_ptr(connection, context->activatedWorkerBareConnections) + { + CloseConnection(connection); + } + + /* delete memory context */ + MemoryContextDelete(context->context); +} + + +/* + * ResetMetadataSyncMemoryContext resets memory context inside metadataSyncContext, if + * we are not collecting commands. + */ +void +ResetMetadataSyncMemoryContext(MetadataSyncContext *context) +{ + if (!MetadataSyncCollectsCommands(context)) + { + MemoryContextReset(context->context); + } +} + + +/* + * MetadataSyncCollectsCommands returns whether context is used for collecting + * commands instead of sending them to workers. + */ +bool +MetadataSyncCollectsCommands(MetadataSyncContext *context) +{ + return context->collectCommands; +} + + +/* + * SendOrCollectCommandListToActivatedNodes sends the commands to the activated nodes with + * bare connections inside metadatacontext or via coordinated connections. + * Note that when context only collects commands, we add commands into the context + * without sending the commands. + */ +void +SendOrCollectCommandListToActivatedNodes(MetadataSyncContext *context, List *commands) +{ + /* do nothing if no commands */ + if (commands == NIL) + { + return; + } + + /* + * do not send any command to workers if we collect commands. + * Collect commands into metadataSyncContext's collected command + * list. + */ + if (MetadataSyncCollectsCommands(context)) + { + context->collectedCommands = list_concat(context->collectedCommands, + commands); + return; + } + + /* send commands to new workers, the current user should be a superuser */ + Assert(superuser()); + + if (context->transactionMode == METADATA_SYNC_TRANSACTIONAL) + { + List *workerNodes = context->activatedWorkerNodeList; + SendMetadataCommandListToWorkerListInCoordinatedTransaction(workerNodes, + CurrentUserName(), + commands); + } + else if (context->transactionMode == METADATA_SYNC_NON_TRANSACTIONAL) + { + List *workerConnections = context->activatedWorkerBareConnections; + SendCommandListToWorkerListWithBareConnections(workerConnections, commands); + } + else + { + pg_unreachable(); + } +} + + +/* + * SendOrCollectCommandListToMetadataNodes sends the commands to the metadata nodes with + * bare connections inside metadatacontext or via coordinated connections. + * Note that when context only collects commands, we add commands into the context + * without sending the commands. + */ +void +SendOrCollectCommandListToMetadataNodes(MetadataSyncContext *context, List *commands) +{ + /* + * do not send any command to workers if we collcet commands. + * Collect commands into metadataSyncContext's collected command + * list. + */ + if (MetadataSyncCollectsCommands(context)) + { + context->collectedCommands = list_concat(context->collectedCommands, + commands); + return; + } + + /* send commands to new workers, the current user should be a superuser */ + Assert(superuser()); + + if (context->transactionMode == METADATA_SYNC_TRANSACTIONAL) + { + List *metadataNodes = TargetWorkerSetNodeList(NON_COORDINATOR_METADATA_NODES, + RowShareLock); + SendMetadataCommandListToWorkerListInCoordinatedTransaction(metadataNodes, + CurrentUserName(), + commands); + } + else if (context->transactionMode == METADATA_SYNC_NON_TRANSACTIONAL) + { + SendBareCommandListToMetadataWorkers(commands); + } + else + { + pg_unreachable(); + } +} + + +/* + * SendOrCollectCommandListToSingleNode sends the commands to the specific worker + * indexed by nodeIdx with bare connection inside metadatacontext or via coordinated + * connection. Note that when context only collects commands, we add commands into + * the context without sending the commands. + */ +void +SendOrCollectCommandListToSingleNode(MetadataSyncContext *context, List *commands, + int nodeIdx) +{ + /* + * Do not send any command to workers if we collect commands. + * Collect commands into metadataSyncContext's collected command + * list. + */ + if (MetadataSyncCollectsCommands(context)) + { + context->collectedCommands = list_concat(context->collectedCommands, + commands); + return; + } + + /* send commands to new workers, the current user should be a superuser */ + Assert(superuser()); + + List *workerConnections = context->activatedWorkerConnections; + Assert(nodeIdx < list_length(workerConnections)); + MultiConnection *workerConnection = list_nth(workerConnections, nodeIdx); + + if (context->transactionMode == METADATA_SYNC_TRANSACTIONAL) + { + List *workerNodes = context->activatedWorkerNodeList; + Assert(nodeIdx < list_length(workerNodes)); + + WorkerNode *node = list_nth(workerNodes, nodeIdx); + SendMetadataCommandListToWorkerListInCoordinatedTransaction(list_make1(node), + CurrentUserName(), + commands); + } + else if (context->transactionMode == METADATA_SYNC_NON_TRANSACTIONAL) + { + List *workerConnections = context->activatedWorkerBareConnections; + Assert(nodeIdx < list_length(workerConnections)); + + MultiConnection *workerConnection = list_nth(workerConnections, nodeIdx); + List *connectionList = list_make1(workerConnection); + SendCommandListToWorkerListWithBareConnections(connectionList, commands); + } + else + { + pg_unreachable(); + } +} diff --git a/src/backend/distributed/shared_library_init.c b/src/backend/distributed/shared_library_init.c index bd9ca679b..23393078b 100644 --- a/src/backend/distributed/shared_library_init.c +++ b/src/backend/distributed/shared_library_init.c @@ -360,6 +360,11 @@ static const struct config_enum_entry cpu_priority_options[] = { { NULL, 0, false} }; +static const struct config_enum_entry metadata_sync_mode_options[] = { + { "transactional", METADATA_SYNC_TRANSACTIONAL, false }, + { "nontransactional", METADATA_SYNC_NON_TRANSACTIONAL, false }, + { NULL, 0, false } +}; /* *INDENT-ON* */ @@ -1880,6 +1885,21 @@ RegisterCitusConfigVariables(void) GUC_UNIT_MS | GUC_NO_SHOW_ALL, NULL, NULL, NULL); + DefineCustomEnumVariable( + "citus.metadata_sync_mode", + gettext_noop("Sets transaction mode for metadata syncs."), + gettext_noop("metadata sync can be run inside a single coordinated " + "transaction or with multiple small transactions in " + "idempotent way. By default we sync metadata in single " + "coordinated transaction. When we hit memory problems " + "at workers, we have alternative nontransactional mode " + "where we send each command with separate transaction."), + &MetadataSyncTransMode, + METADATA_SYNC_TRANSACTIONAL, metadata_sync_mode_options, + PGC_SUSET, + GUC_SUPERUSER_ONLY | GUC_NO_SHOW_ALL, + NULL, NULL, NULL); + DefineCustomIntVariable( "citus.metadata_sync_retry_interval", gettext_noop("Sets the interval to retry failed metadata syncs."), diff --git a/src/include/distributed/metadata_sync.h b/src/include/distributed/metadata_sync.h index 11140beff..89a859a7f 100644 --- a/src/include/distributed/metadata_sync.h +++ b/src/include/distributed/metadata_sync.h @@ -18,9 +18,30 @@ #include "distributed/metadata_cache.h" #include "nodes/pg_list.h" +/* managed via guc.c */ +typedef enum +{ + METADATA_SYNC_TRANSACTIONAL = 0, + METADATA_SYNC_NON_TRANSACTIONAL = 1 +} MetadataSyncTransactionMode; + /* config variables */ extern int MetadataSyncInterval; extern int MetadataSyncRetryInterval; +extern int MetadataSyncTransMode; + +/* + * MetadataSyncContext is used throughout metadata sync. + */ +typedef struct MetadataSyncContext +{ + List *activatedWorkerNodeList; /* activated worker nodes */ + List *activatedWorkerConnections; /* connections to activated worker nodes */ + MemoryContext context; /* memory context for all allocations */ + MetadataSyncTransactionMode transactionMode; /* transaction mode for the sync */ + bool collectCommands; /* flag to collect commands instead of sending and resetting */ + List *collectedCommands; /* collected commands. (NIL if collectCommands == false) */ +} MetadataSyncContext; typedef enum { @@ -116,6 +137,22 @@ extern void SyncNewColocationGroupToNodes(uint32 colocationId, int shardCount, Oid distributionColumnCollation); extern void SyncDeleteColocationGroupToNodes(uint32 colocationId); +extern MetadataSyncContext * CreateMetadataSyncContext(List *nodeList, bool testMode); +extern void DestroyMetadataSyncContext(MetadataSyncContext *context); +extern void EstablishAndSetMetadataSyncBareConnections(MetadataSyncContext *context); +extern void EstablishAndSetMetadataSyncCoordinatedConnections( + MetadataSyncContext *context); +extern void SetMetadataSyncNodesFromNodeList(MetadataSyncContext *context, + List *nodeList); +extern void ResetMetadataSyncMemoryContext(MetadataSyncContext *context); +extern bool MetadataSyncCollectsCommands(MetadataSyncContext *context); +extern void SendOrCollectCommandListToActivatedNodes(MetadataSyncContext *context, + List *commands); +extern void SendOrCollectCommandListToMetadataNodes(MetadataSyncContext *context, + List *commands); +extern void SendOrCollectCommandListToSingleNode(MetadataSyncContext *context, + List *commands, int nodeIdx); + #define DELETE_ALL_NODES "DELETE FROM pg_dist_node" #define DELETE_ALL_PLACEMENTS "DELETE FROM pg_dist_placement" #define DELETE_ALL_SHARDS "DELETE FROM pg_dist_shard"