Merge pull request #5493 from citusdata/metadata_connecttion

Make sure to use a dedicated metadata connection
pull/5498/head
Önder Kalacı 2021-11-26 14:47:49 +01:00 committed by GitHub
commit 5ef0bae06f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 154 additions and 31 deletions

View File

@ -56,6 +56,9 @@ static int ConnectionHashCompare(const void *a, const void *b, Size keysize);
static void StartConnectionEstablishment(MultiConnection *connectionn, static void StartConnectionEstablishment(MultiConnection *connectionn,
ConnectionHashKey *key); ConnectionHashKey *key);
static MultiConnection * FindAvailableConnection(dlist_head *connections, uint32 flags); static MultiConnection * FindAvailableConnection(dlist_head *connections, uint32 flags);
#ifdef USE_ASSERT_CHECKING
static void AssertSingleMetadataConnectionExists(dlist_head *connections);
#endif
static void FreeConnParamsHashEntryFields(ConnParamsHashEntry *entry); static void FreeConnParamsHashEntryFields(ConnParamsHashEntry *entry);
static void AfterXactHostConnectionHandling(ConnectionHashEntry *entry, bool isCommit); static void AfterXactHostConnectionHandling(ConnectionHashEntry *entry, bool isCommit);
static bool ShouldShutdownConnection(MultiConnection *connection, const int static bool ShouldShutdownConnection(MultiConnection *connection, const int
@ -329,6 +332,12 @@ StartNodeUserDatabaseConnection(uint32 flags, const char *hostname, int32 port,
return connection; return connection;
} }
} }
else if (flags & REQUIRE_METADATA_CONNECTION)
{
/* FORCE_NEW_CONNECTION and REQUIRE_METADATA_CONNECTION are incompatible */
ereport(ERROR, (errmsg("metadata connections cannot be forced to open "
"a new connection")));
}
/* /*
@ -389,6 +398,12 @@ StartNodeUserDatabaseConnection(uint32 flags, const char *hostname, int32 port,
ResetShardPlacementAssociation(connection); ResetShardPlacementAssociation(connection);
if ((flags & REQUIRE_METADATA_CONNECTION))
{
connection->useForMetadataOperations = true;
}
/* fully initialized the connection, record it */ /* fully initialized the connection, record it */
connection->initilizationState = POOL_STATE_INITIALIZED; connection->initilizationState = POOL_STATE_INITIALIZED;
@ -406,7 +421,6 @@ static MultiConnection *
FindAvailableConnection(dlist_head *connections, uint32 flags) FindAvailableConnection(dlist_head *connections, uint32 flags)
{ {
dlist_iter iter; dlist_iter iter;
dlist_foreach(iter, connections) dlist_foreach(iter, connections)
{ {
MultiConnection *connection = MultiConnection *connection =
@ -454,13 +468,97 @@ FindAvailableConnection(dlist_head *connections, uint32 flags)
continue; continue;
} }
if ((flags & REQUIRE_METADATA_CONNECTION) &&
!connection->useForMetadataOperations)
{
/*
* The caller requested a metadata connection, and this is not the
* metadata connection.
*/
continue;
}
else
{
/*
* Now that we found metadata connection. We do some sanity
* checks.
*/
#ifdef USE_ASSERT_CHECKING
AssertSingleMetadataConnectionExists(connections);
#endif
/*
* Connection is in use for an ongoing operation. Metadata
* connection cannot be claimed exclusively.
*/
if (connection->claimedExclusively)
{
ereport(ERROR, (errmsg("metadata connections cannot be "
"claimed exclusively")));
}
}
return connection; return connection;
} }
if ((flags & REQUIRE_METADATA_CONNECTION) && !dlist_is_empty(connections))
{
/*
* Caller asked a metadata connection, and we couldn't find in the
* above list. So, we pick the first connection as the metadata
* connection.
*/
MultiConnection *metadataConnection =
dlist_container(MultiConnection, connectionNode,
dlist_head_node(connections));
/* remember that we use this connection for metadata operations */
metadataConnection->useForMetadataOperations = true;
#ifdef USE_ASSERT_CHECKING
AssertSingleMetadataConnectionExists(connections);
#endif
return metadataConnection;
}
return NULL; return NULL;
} }
#ifdef USE_ASSERT_CHECKING
/*
* AssertSingleMetadataConnectionExists throws an error if the
* input connection dlist contains more than one metadata connections.
*/
static void
AssertSingleMetadataConnectionExists(dlist_head *connections)
{
bool foundMetadataConnection = false;
dlist_iter iter;
dlist_foreach(iter, connections)
{
MultiConnection *connection =
dlist_container(MultiConnection, connectionNode, iter.cur);
if (connection->useForMetadataOperations)
{
if (foundMetadataConnection)
{
ereport(ERROR, (errmsg("cannot have multiple metadata connections")));
}
foundMetadataConnection = true;
}
}
}
#endif /* USE_ASSERT_CHECKING */
/* /*
* CloseAllConnectionsAfterTransaction sets the forceClose flag of all the * CloseAllConnectionsAfterTransaction sets the forceClose flag of all the
* connections. This is mainly done when citus.node_conninfo changes. * connections. This is mainly done when citus.node_conninfo changes.

View File

@ -465,16 +465,16 @@ SyncMetadataSnapshotToNode(WorkerNode *workerNode, bool raiseOnError)
*/ */
if (raiseOnError) if (raiseOnError)
{ {
SendCommandListToWorkerInCoordinatedTransaction(workerNode->workerName, SendMetadataCommandListToWorkerInCoordinatedTransaction(workerNode->workerName,
workerNode->workerPort, workerNode->workerPort,
currentUser, currentUser,
recreateMetadataSnapshotCommandList); recreateMetadataSnapshotCommandList);
return true; return true;
} }
else else
{ {
bool success = bool success =
SendOptionalCommandListToWorkerInCoordinatedTransaction( SendOptionalMetadataCommandListToWorkerInCoordinatedTransaction(
workerNode->workerName, workerNode->workerPort, workerNode->workerName, workerNode->workerPort,
currentUser, recreateMetadataSnapshotCommandList); currentUser, recreateMetadataSnapshotCommandList);
@ -500,10 +500,11 @@ DropMetadataSnapshotOnNode(WorkerNode *workerNode)
dropMetadataCommandList = lappend(dropMetadataCommandList, dropMetadataCommandList = lappend(dropMetadataCommandList,
LocalGroupIdUpdateCommand(0)); LocalGroupIdUpdateCommand(0));
SendOptionalCommandListToWorkerInCoordinatedTransaction(workerNode->workerName, SendOptionalMetadataCommandListToWorkerInCoordinatedTransaction(
workerNode->workerPort, workerNode->workerName,
userName, workerNode->workerPort,
dropMetadataCommandList); userName,
dropMetadataCommandList);
} }

View File

@ -1639,7 +1639,7 @@ SetWorkerColumnOptional(WorkerNode *workerNode, int columnIndex, Datum value)
WorkerNode *worker = NULL; WorkerNode *worker = NULL;
foreach_ptr(worker, workerNodeList) foreach_ptr(worker, workerNodeList)
{ {
bool success = SendOptionalCommandListToWorkerInCoordinatedTransaction( bool success = SendOptionalMetadataCommandListToWorkerInCoordinatedTransaction(
worker->workerName, worker->workerPort, worker->workerName, worker->workerPort,
CurrentUserName(), CurrentUserName(),
list_make1(metadataSyncCommand)); list_make1(metadataSyncCommand));

View File

@ -367,7 +367,7 @@ SendCommandToWorkersParamsInternal(TargetWorkerSet targetWorkerSet, const char *
{ {
const char *nodeName = workerNode->workerName; const char *nodeName = workerNode->workerName;
int nodePort = workerNode->workerPort; int nodePort = workerNode->workerPort;
int32 connectionFlags = 0; int32 connectionFlags = REQUIRE_METADATA_CONNECTION;
MultiConnection *connection = StartNodeUserDatabaseConnection(connectionFlags, MultiConnection *connection = StartNodeUserDatabaseConnection(connectionFlags,
nodeName, nodePort, nodeName, nodePort,
@ -470,10 +470,12 @@ SendCommandListToWorkerOutsideTransaction(const char *nodeName, int32 nodePort,
* coordinated transaction. Any failures aborts the coordinated transaction. * coordinated transaction. Any failures aborts the coordinated transaction.
*/ */
void void
SendCommandListToWorkerInCoordinatedTransaction(const char *nodeName, int32 nodePort, SendMetadataCommandListToWorkerInCoordinatedTransaction(const char *nodeName,
const char *nodeUser, List *commandList) int32 nodePort,
const char *nodeUser,
List *commandList)
{ {
int connectionFlags = 0; int connectionFlags = REQUIRE_METADATA_CONNECTION;
UseCoordinatedTransaction(); UseCoordinatedTransaction();
@ -542,17 +544,17 @@ SendOptionalCommandListToWorkerOutsideTransaction(const char *nodeName, int32 no
/* /*
* SendOptionalCommandListToWorkerInCoordinatedTransaction sends the given * SendOptionalMetadataCommandListToWorkerInCoordinatedTransaction sends the given
* command list to the given worker as part of the coordinated transaction. * command list to the given worker as part of the coordinated transaction.
* If any of the commands fail, the function returns false. * If any of the commands fail, the function returns false.
*/ */
bool bool
SendOptionalCommandListToWorkerInCoordinatedTransaction(const char *nodeName, int32 SendOptionalMetadataCommandListToWorkerInCoordinatedTransaction(const char *nodeName,
nodePort, int32 nodePort,
const char *nodeUser, const char *nodeUser,
List *commandList) List *commandList)
{ {
int connectionFlags = 0; int connectionFlags = REQUIRE_METADATA_CONNECTION;
bool failed = false; bool failed = false;
UseCoordinatedTransaction(); UseCoordinatedTransaction();

View File

@ -55,20 +55,33 @@ enum MultiConnectionMode
OUTSIDE_TRANSACTION = 1 << 4, OUTSIDE_TRANSACTION = 1 << 4,
/*
* All metadata changes should go through the same connection, otherwise
* self-deadlocks are possible. That is because the same metadata (e.g.,
* metadata includes the distributed table on the workers) can be modified
* accross multiple connections.
*
* With this flag, we guarantee that there is a single metadata connection.
* But note that this connection can be used for any other operation.
* In other words, this connection is not exclusively reserved for metadata
* operations.
*/
REQUIRE_METADATA_CONNECTION = 1 << 5,
/* /*
* Some connections are optional such as when adaptive executor is executing * Some connections are optional such as when adaptive executor is executing
* a multi-shard command and requires the second (or further) connections * a multi-shard command and requires the second (or further) connections
* per node. In that case, the connection manager may decide not to allow the * per node. In that case, the connection manager may decide not to allow the
* connection. * connection.
*/ */
OPTIONAL_CONNECTION = 1 << 5, OPTIONAL_CONNECTION = 1 << 6,
/* /*
* When this flag is passed, via connection throttling, the connection * When this flag is passed, via connection throttling, the connection
* establishments may be suspended until a connection slot is available to * establishments may be suspended until a connection slot is available to
* the remote host. * the remote host.
*/ */
WAIT_FOR_CONNECTION = 1 << 6 WAIT_FOR_CONNECTION = 1 << 7
}; };
@ -133,6 +146,12 @@ typedef struct MultiConnection
/* is the connection currently in use, and shouldn't be used by anything else */ /* is the connection currently in use, and shouldn't be used by anything else */
bool claimedExclusively; bool claimedExclusively;
/*
* Should be used to access/modify metadata. See REQUIRE_METADATA_CONNECTION for
* the details.
*/
bool useForMetadataOperations;
/* time connection establishment was started, for timeout and executor stats */ /* time connection establishment was started, for timeout and executor stats */
instr_time connectionEstablishmentStart; instr_time connectionEstablishmentStart;
instr_time connectionEstablishmentEnd; instr_time connectionEstablishmentEnd;

View File

@ -41,10 +41,13 @@ extern bool SendOptionalCommandListToWorkerOutsideTransaction(const char *nodeNa
int32 nodePort, int32 nodePort,
const char *nodeUser, const char *nodeUser,
List *commandList); List *commandList);
extern bool SendOptionalCommandListToWorkerInCoordinatedTransaction(const char *nodeName, extern bool SendOptionalMetadataCommandListToWorkerInCoordinatedTransaction(const
int32 nodePort, char *nodeName,
const char *nodeUser, int32 nodePort,
List *commandList); const char *
nodeUser,
List *
commandList);
extern void SendCommandToWorkersWithMetadata(const char *command); extern void SendCommandToWorkersWithMetadata(const char *command);
extern void SendBareCommandListToMetadataWorkers(List *commandList); extern void SendBareCommandListToMetadataWorkers(List *commandList);
extern void EnsureNoModificationsHaveBeenDone(void); extern void EnsureNoModificationsHaveBeenDone(void);
@ -52,10 +55,10 @@ extern void SendCommandListToWorkerOutsideTransaction(const char *nodeName,
int32 nodePort, int32 nodePort,
const char *nodeUser, const char *nodeUser,
List *commandList); List *commandList);
extern void SendCommandListToWorkerInCoordinatedTransaction(const char *nodeName, extern void SendMetadataCommandListToWorkerInCoordinatedTransaction(const char *nodeName,
int32 nodePort, int32 nodePort,
const char *nodeUser, const char *nodeUser,
List *commandList); List *commandList);
extern void SendCommandToWorkersOptionalInParallel(TargetWorkerSet targetWorkerSet, extern void SendCommandToWorkersOptionalInParallel(TargetWorkerSet targetWorkerSet,
const char *command, const char *command,
const char *user); const char *user);