diff --git a/src/backend/distributed/connection/connection_management.c b/src/backend/distributed/connection/connection_management.c index 1768575c7..dc53cdda5 100644 --- a/src/backend/distributed/connection/connection_management.c +++ b/src/backend/distributed/connection/connection_management.c @@ -56,6 +56,9 @@ static int ConnectionHashCompare(const void *a, const void *b, Size keysize); static void StartConnectionEstablishment(MultiConnection *connectionn, ConnectionHashKey *key); static MultiConnection * FindAvailableConnection(dlist_head *connections, uint32 flags); +#ifdef USE_ASSERT_CHECKING +static void AssertSingleMetadataConnectionExists(dlist_head *connections); +#endif static void FreeConnParamsHashEntryFields(ConnParamsHashEntry *entry); static void AfterXactHostConnectionHandling(ConnectionHashEntry *entry, bool isCommit); static bool ShouldShutdownConnection(MultiConnection *connection, const int @@ -329,6 +332,12 @@ StartNodeUserDatabaseConnection(uint32 flags, const char *hostname, int32 port, return connection; } } + else if (flags & REQUIRE_METADATA_CONNECTION) + { + /* FORCE_NEW_CONNECTION and REQUIRE_METADATA_CONNECTION are incompatible */ + ereport(ERROR, (errmsg("metadata connections cannot be forced to open " + "a new connection"))); + } /* @@ -389,6 +398,12 @@ StartNodeUserDatabaseConnection(uint32 flags, const char *hostname, int32 port, ResetShardPlacementAssociation(connection); + + if ((flags & REQUIRE_METADATA_CONNECTION)) + { + connection->useForMetadataOperations = true; + } + /* fully initialized the connection, record it */ connection->initilizationState = POOL_STATE_INITIALIZED; @@ -406,7 +421,6 @@ static MultiConnection * FindAvailableConnection(dlist_head *connections, uint32 flags) { dlist_iter iter; - dlist_foreach(iter, connections) { MultiConnection *connection = @@ -454,13 +468,97 @@ FindAvailableConnection(dlist_head *connections, uint32 flags) continue; } + if ((flags & REQUIRE_METADATA_CONNECTION) && + !connection->useForMetadataOperations) + { + /* + * The caller requested a metadata connection, and this is not the + * metadata connection. + */ + continue; + } + else + { + /* + * Now that we found metadata connection. We do some sanity + * checks. + */ + #ifdef USE_ASSERT_CHECKING + AssertSingleMetadataConnectionExists(connections); + #endif + + /* + * Connection is in use for an ongoing operation. Metadata + * connection cannot be claimed exclusively. + */ + if (connection->claimedExclusively) + { + ereport(ERROR, (errmsg("metadata connections cannot be " + "claimed exclusively"))); + } + } + return connection; } + if ((flags & REQUIRE_METADATA_CONNECTION) && !dlist_is_empty(connections)) + { + /* + * Caller asked a metadata connection, and we couldn't find in the + * above list. So, we pick the first connection as the metadata + * connection. + */ + MultiConnection *metadataConnection = + dlist_container(MultiConnection, connectionNode, + dlist_head_node(connections)); + + + /* remember that we use this connection for metadata operations */ + metadataConnection->useForMetadataOperations = true; + + #ifdef USE_ASSERT_CHECKING + AssertSingleMetadataConnectionExists(connections); + #endif + + return metadataConnection; + } + return NULL; } +#ifdef USE_ASSERT_CHECKING + +/* + * AssertSingleMetadataConnectionExists throws an error if the + * input connection dlist contains more than one metadata connections. + */ +static void +AssertSingleMetadataConnectionExists(dlist_head *connections) +{ + bool foundMetadataConnection = false; + dlist_iter iter; + dlist_foreach(iter, connections) + { + MultiConnection *connection = + dlist_container(MultiConnection, connectionNode, iter.cur); + + if (connection->useForMetadataOperations) + { + if (foundMetadataConnection) + { + ereport(ERROR, (errmsg("cannot have multiple metadata connections"))); + } + + foundMetadataConnection = true; + } + } +} + + +#endif /* USE_ASSERT_CHECKING */ + + /* * CloseAllConnectionsAfterTransaction sets the forceClose flag of all the * connections. This is mainly done when citus.node_conninfo changes. diff --git a/src/backend/distributed/metadata/metadata_sync.c b/src/backend/distributed/metadata/metadata_sync.c index efac2d341..0e6c472c8 100644 --- a/src/backend/distributed/metadata/metadata_sync.c +++ b/src/backend/distributed/metadata/metadata_sync.c @@ -465,16 +465,16 @@ SyncMetadataSnapshotToNode(WorkerNode *workerNode, bool raiseOnError) */ if (raiseOnError) { - SendCommandListToWorkerInCoordinatedTransaction(workerNode->workerName, - workerNode->workerPort, - currentUser, - recreateMetadataSnapshotCommandList); + SendMetadataCommandListToWorkerInCoordinatedTransaction(workerNode->workerName, + workerNode->workerPort, + currentUser, + recreateMetadataSnapshotCommandList); return true; } else { bool success = - SendOptionalCommandListToWorkerInCoordinatedTransaction( + SendOptionalMetadataCommandListToWorkerInCoordinatedTransaction( workerNode->workerName, workerNode->workerPort, currentUser, recreateMetadataSnapshotCommandList); @@ -500,10 +500,11 @@ DropMetadataSnapshotOnNode(WorkerNode *workerNode) dropMetadataCommandList = lappend(dropMetadataCommandList, LocalGroupIdUpdateCommand(0)); - SendOptionalCommandListToWorkerInCoordinatedTransaction(workerNode->workerName, - workerNode->workerPort, - userName, - dropMetadataCommandList); + SendOptionalMetadataCommandListToWorkerInCoordinatedTransaction( + workerNode->workerName, + workerNode->workerPort, + userName, + dropMetadataCommandList); } diff --git a/src/backend/distributed/metadata/node_metadata.c b/src/backend/distributed/metadata/node_metadata.c index 6d67219d0..cff6358fe 100644 --- a/src/backend/distributed/metadata/node_metadata.c +++ b/src/backend/distributed/metadata/node_metadata.c @@ -1639,7 +1639,7 @@ SetWorkerColumnOptional(WorkerNode *workerNode, int columnIndex, Datum value) WorkerNode *worker = NULL; foreach_ptr(worker, workerNodeList) { - bool success = SendOptionalCommandListToWorkerInCoordinatedTransaction( + bool success = SendOptionalMetadataCommandListToWorkerInCoordinatedTransaction( worker->workerName, worker->workerPort, CurrentUserName(), list_make1(metadataSyncCommand)); diff --git a/src/backend/distributed/transaction/worker_transaction.c b/src/backend/distributed/transaction/worker_transaction.c index cece88323..61baff4fe 100644 --- a/src/backend/distributed/transaction/worker_transaction.c +++ b/src/backend/distributed/transaction/worker_transaction.c @@ -367,7 +367,7 @@ SendCommandToWorkersParamsInternal(TargetWorkerSet targetWorkerSet, const char * { const char *nodeName = workerNode->workerName; int nodePort = workerNode->workerPort; - int32 connectionFlags = 0; + int32 connectionFlags = REQUIRE_METADATA_CONNECTION; MultiConnection *connection = StartNodeUserDatabaseConnection(connectionFlags, nodeName, nodePort, @@ -470,10 +470,12 @@ SendCommandListToWorkerOutsideTransaction(const char *nodeName, int32 nodePort, * coordinated transaction. Any failures aborts the coordinated transaction. */ void -SendCommandListToWorkerInCoordinatedTransaction(const char *nodeName, int32 nodePort, - const char *nodeUser, List *commandList) +SendMetadataCommandListToWorkerInCoordinatedTransaction(const char *nodeName, + int32 nodePort, + const char *nodeUser, + List *commandList) { - int connectionFlags = 0; + int connectionFlags = REQUIRE_METADATA_CONNECTION; UseCoordinatedTransaction(); @@ -542,17 +544,17 @@ SendOptionalCommandListToWorkerOutsideTransaction(const char *nodeName, int32 no /* - * SendOptionalCommandListToWorkerInCoordinatedTransaction sends the given + * SendOptionalMetadataCommandListToWorkerInCoordinatedTransaction sends the given * command list to the given worker as part of the coordinated transaction. * If any of the commands fail, the function returns false. */ bool -SendOptionalCommandListToWorkerInCoordinatedTransaction(const char *nodeName, int32 - nodePort, - const char *nodeUser, - List *commandList) +SendOptionalMetadataCommandListToWorkerInCoordinatedTransaction(const char *nodeName, + int32 nodePort, + const char *nodeUser, + List *commandList) { - int connectionFlags = 0; + int connectionFlags = REQUIRE_METADATA_CONNECTION; bool failed = false; UseCoordinatedTransaction(); diff --git a/src/include/distributed/connection_management.h b/src/include/distributed/connection_management.h index 5dffdef35..68dde4fe3 100644 --- a/src/include/distributed/connection_management.h +++ b/src/include/distributed/connection_management.h @@ -55,20 +55,33 @@ enum MultiConnectionMode OUTSIDE_TRANSACTION = 1 << 4, + /* + * All metadata changes should go through the same connection, otherwise + * self-deadlocks are possible. That is because the same metadata (e.g., + * metadata includes the distributed table on the workers) can be modified + * accross multiple connections. + * + * With this flag, we guarantee that there is a single metadata connection. + * But note that this connection can be used for any other operation. + * In other words, this connection is not exclusively reserved for metadata + * operations. + */ + REQUIRE_METADATA_CONNECTION = 1 << 5, + /* * Some connections are optional such as when adaptive executor is executing * a multi-shard command and requires the second (or further) connections * per node. In that case, the connection manager may decide not to allow the * connection. */ - OPTIONAL_CONNECTION = 1 << 5, + OPTIONAL_CONNECTION = 1 << 6, /* * When this flag is passed, via connection throttling, the connection * establishments may be suspended until a connection slot is available to * the remote host. */ - WAIT_FOR_CONNECTION = 1 << 6 + WAIT_FOR_CONNECTION = 1 << 7 }; @@ -133,6 +146,12 @@ typedef struct MultiConnection /* is the connection currently in use, and shouldn't be used by anything else */ bool claimedExclusively; + /* + * Should be used to access/modify metadata. See REQUIRE_METADATA_CONNECTION for + * the details. + */ + bool useForMetadataOperations; + /* time connection establishment was started, for timeout and executor stats */ instr_time connectionEstablishmentStart; instr_time connectionEstablishmentEnd; diff --git a/src/include/distributed/worker_transaction.h b/src/include/distributed/worker_transaction.h index 14daee4ef..63d419c66 100644 --- a/src/include/distributed/worker_transaction.h +++ b/src/include/distributed/worker_transaction.h @@ -41,10 +41,13 @@ extern bool SendOptionalCommandListToWorkerOutsideTransaction(const char *nodeNa int32 nodePort, const char *nodeUser, List *commandList); -extern bool SendOptionalCommandListToWorkerInCoordinatedTransaction(const char *nodeName, - int32 nodePort, - const char *nodeUser, - List *commandList); +extern bool SendOptionalMetadataCommandListToWorkerInCoordinatedTransaction(const + char *nodeName, + int32 nodePort, + const char * + nodeUser, + List * + commandList); extern void SendCommandToWorkersWithMetadata(const char *command); extern void SendBareCommandListToMetadataWorkers(List *commandList); extern void EnsureNoModificationsHaveBeenDone(void); @@ -52,10 +55,10 @@ extern void SendCommandListToWorkerOutsideTransaction(const char *nodeName, int32 nodePort, const char *nodeUser, List *commandList); -extern void SendCommandListToWorkerInCoordinatedTransaction(const char *nodeName, - int32 nodePort, - const char *nodeUser, - List *commandList); +extern void SendMetadataCommandListToWorkerInCoordinatedTransaction(const char *nodeName, + int32 nodePort, + const char *nodeUser, + List *commandList); extern void SendCommandToWorkersOptionalInParallel(TargetWorkerSet targetWorkerSet, const char *command, const char *user);