mirror of https://github.com/citusdata/citus.git
Remove all references for side channel connections
We don't need any side channel connections. That is actually problematic in the sense that it creates extra connections. Say, citus.max_adaptive_executor_pool_size equals to 1, Citus ends up using one extra connection for the intermediate results. Thus, not obeying citus.max_adaptive_executor_pool_size. In this PR, we remove the following entities from the codebase to allow further commits to implement not requiring extra connection for the intermediate results: - The connection flag REQUIRE_SIDECHANNEL - The function GivePurposeToConnection - The ConnectionPurpose struct and related fieldspull/3703/head
parent
e31dcff178
commit
9b29a32d7a
|
@ -54,7 +54,6 @@ static bool ShouldShutdownConnection(MultiConnection *connection, const int
|
||||||
static void ResetConnection(MultiConnection *connection);
|
static void ResetConnection(MultiConnection *connection);
|
||||||
static void DefaultCitusNoticeProcessor(void *arg, const char *message);
|
static void DefaultCitusNoticeProcessor(void *arg, const char *message);
|
||||||
static MultiConnection * FindAvailableConnection(dlist_head *connections, uint32 flags);
|
static MultiConnection * FindAvailableConnection(dlist_head *connections, uint32 flags);
|
||||||
static void GivePurposeToConnection(MultiConnection *connection, int flags);
|
|
||||||
static bool RemoteTransactionIdle(MultiConnection *connection);
|
static bool RemoteTransactionIdle(MultiConnection *connection);
|
||||||
static int EventSetSizeForConnectionList(List *connections);
|
static int EventSetSizeForConnectionList(List *connections);
|
||||||
|
|
||||||
|
@ -314,8 +313,6 @@ StartNodeUserDatabaseConnection(uint32 flags, const char *hostname, int32 port,
|
||||||
connection = FindAvailableConnection(entry->connections, flags);
|
connection = FindAvailableConnection(entry->connections, flags);
|
||||||
if (connection)
|
if (connection)
|
||||||
{
|
{
|
||||||
GivePurposeToConnection(connection, flags);
|
|
||||||
|
|
||||||
return connection;
|
return connection;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -330,7 +327,6 @@ StartNodeUserDatabaseConnection(uint32 flags, const char *hostname, int32 port,
|
||||||
dlist_push_tail(entry->connections, &connection->connectionNode);
|
dlist_push_tail(entry->connections, &connection->connectionNode);
|
||||||
|
|
||||||
ResetShardPlacementAssociation(connection);
|
ResetShardPlacementAssociation(connection);
|
||||||
GivePurposeToConnection(connection, flags);
|
|
||||||
|
|
||||||
return connection;
|
return connection;
|
||||||
}
|
}
|
||||||
|
@ -338,10 +334,7 @@ StartNodeUserDatabaseConnection(uint32 flags, const char *hostname, int32 port,
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* FindAvailableConnection searches the given list of connections for one that
|
* FindAvailableConnection searches the given list of connections for one that
|
||||||
* is not claimed exclusively or marked as a side channel. If the caller passed
|
* is not claimed exclusively.
|
||||||
* the REQUIRE_SIDECHANNEL flag, it will only return a connection that has not
|
|
||||||
* been used to access shard placements and that connectoin will only be returned
|
|
||||||
* in subsequent calls if the REQUIRE_SIDECHANNEL flag is passed.
|
|
||||||
*
|
*
|
||||||
* If no connection is available, FindAvailableConnection returns NULL.
|
* If no connection is available, FindAvailableConnection returns NULL.
|
||||||
*/
|
*/
|
||||||
|
@ -382,56 +375,13 @@ FindAvailableConnection(dlist_head *connections, uint32 flags)
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
if ((flags & REQUIRE_SIDECHANNEL) != 0)
|
|
||||||
{
|
|
||||||
if (connection->purpose == CONNECTION_PURPOSE_SIDECHANNEL ||
|
|
||||||
connection->purpose == CONNECTION_PURPOSE_ANY)
|
|
||||||
{
|
|
||||||
/* side channel must not have been used to access data */
|
|
||||||
Assert(!ConnectionUsedForAnyPlacements(connection));
|
|
||||||
|
|
||||||
return connection;
|
return connection;
|
||||||
}
|
}
|
||||||
}
|
|
||||||
else if (connection->purpose == CONNECTION_PURPOSE_DATA_ACCESS ||
|
|
||||||
connection->purpose == CONNECTION_PURPOSE_ANY)
|
|
||||||
{
|
|
||||||
/* can use this connection to access data */
|
|
||||||
return connection;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
|
||||||
* GivePurposeToConnection gives purpose to a connection if it does not already
|
|
||||||
* have a purpose. More specifically, it marks the connection as a sidechannel
|
|
||||||
* if the REQUIRE_SIDECHANNEL flag is set.
|
|
||||||
*/
|
|
||||||
static void
|
|
||||||
GivePurposeToConnection(MultiConnection *connection, int flags)
|
|
||||||
{
|
|
||||||
if (connection->purpose != CONNECTION_PURPOSE_ANY)
|
|
||||||
{
|
|
||||||
/* connection already has a purpose */
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
if ((flags & REQUIRE_SIDECHANNEL) != 0)
|
|
||||||
{
|
|
||||||
/* connection should not be used for data access */
|
|
||||||
connection->purpose = CONNECTION_PURPOSE_SIDECHANNEL;
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
/* connection should be used for data access */
|
|
||||||
connection->purpose = CONNECTION_PURPOSE_DATA_ACCESS;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* CloseAllConnectionsAfterTransaction sets the forceClose flag of all the
|
* CloseAllConnectionsAfterTransaction sets the forceClose flag of all the
|
||||||
* connections. This is mainly done when citus.node_conninfo changes.
|
* connections. This is mainly done when citus.node_conninfo changes.
|
||||||
|
@ -1071,7 +1021,6 @@ StartConnectionEstablishment(ConnectionHashKey *key)
|
||||||
false);
|
false);
|
||||||
connection->connectionStart = GetCurrentTimestamp();
|
connection->connectionStart = GetCurrentTimestamp();
|
||||||
connection->connectionId = connectionId++;
|
connection->connectionId = connectionId++;
|
||||||
connection->purpose = CONNECTION_PURPOSE_ANY;
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* To avoid issues with interrupts not getting caught all our connections
|
* To avoid issues with interrupts not getting caught all our connections
|
||||||
|
@ -1228,7 +1177,6 @@ ResetConnection(MultiConnection *connection)
|
||||||
|
|
||||||
/* reset copy state */
|
/* reset copy state */
|
||||||
connection->copyBytesWrittenSinceLastFlush = 0;
|
connection->copyBytesWrittenSinceLastFlush = 0;
|
||||||
connection->purpose = CONNECTION_PURPOSE_ANY;
|
|
||||||
|
|
||||||
UnclaimConnection(connection);
|
UnclaimConnection(connection);
|
||||||
}
|
}
|
||||||
|
|
|
@ -274,17 +274,11 @@ RemoteFileDestReceiverStartup(DestReceiver *dest, int operation,
|
||||||
WorkerNode *workerNode = NULL;
|
WorkerNode *workerNode = NULL;
|
||||||
foreach_ptr(workerNode, initialNodeList)
|
foreach_ptr(workerNode, initialNodeList)
|
||||||
{
|
{
|
||||||
|
int flags = 0;
|
||||||
|
|
||||||
const char *nodeName = workerNode->workerName;
|
const char *nodeName = workerNode->workerName;
|
||||||
int nodePort = workerNode->workerPort;
|
int nodePort = workerNode->workerPort;
|
||||||
|
|
||||||
/*
|
|
||||||
* We prefer to use a connection that is not associcated with
|
|
||||||
* any placements. The reason is that we claim this connection
|
|
||||||
* exclusively and that would prevent the consecutive DML/DDL
|
|
||||||
* use the same connection.
|
|
||||||
*/
|
|
||||||
int flags = REQUIRE_SIDECHANNEL;
|
|
||||||
|
|
||||||
MultiConnection *connection = StartNodeConnection(flags, nodeName, nodePort);
|
MultiConnection *connection = StartNodeConnection(flags, nodeName, nodePort);
|
||||||
ClaimConnectionExclusively(connection);
|
ClaimConnectionExclusively(connection);
|
||||||
MarkRemoteTransactionCritical(connection);
|
MarkRemoteTransactionCritical(connection);
|
||||||
|
|
|
@ -52,28 +52,9 @@ enum MultiConnectionMode
|
||||||
/* open a connection per (co-located set of) placement(s) */
|
/* open a connection per (co-located set of) placement(s) */
|
||||||
CONNECTION_PER_PLACEMENT = 1 << 3,
|
CONNECTION_PER_PLACEMENT = 1 << 3,
|
||||||
|
|
||||||
OUTSIDE_TRANSACTION = 1 << 4,
|
OUTSIDE_TRANSACTION = 1 << 4
|
||||||
|
|
||||||
/* connection has not been used to access data */
|
|
||||||
REQUIRE_SIDECHANNEL = 1 << 5
|
|
||||||
};
|
};
|
||||||
|
|
||||||
/*
|
|
||||||
* ConnectionPurpose defines what a connection is used for during the
|
|
||||||
* current transaction. This is primarily to not allocate connections
|
|
||||||
* that are needed for data access to other purposes.
|
|
||||||
*/
|
|
||||||
typedef enum ConnectionPurpose
|
|
||||||
{
|
|
||||||
/* connection can be used for any purpose */
|
|
||||||
CONNECTION_PURPOSE_ANY,
|
|
||||||
|
|
||||||
/* connection can be used to access placements */
|
|
||||||
CONNECTION_PURPOSE_DATA_ACCESS,
|
|
||||||
|
|
||||||
/* connection can be used for auxiliary functions, but not data access */
|
|
||||||
CONNECTION_PURPOSE_SIDECHANNEL
|
|
||||||
} ConnectionPurpose;
|
|
||||||
|
|
||||||
typedef enum MultiConnectionState
|
typedef enum MultiConnectionState
|
||||||
{
|
{
|
||||||
|
@ -116,9 +97,6 @@ typedef struct MultiConnection
|
||||||
/* is the connection currently in use, and shouldn't be used by anything else */
|
/* is the connection currently in use, and shouldn't be used by anything else */
|
||||||
bool claimedExclusively;
|
bool claimedExclusively;
|
||||||
|
|
||||||
/* defines the purpose of the connection */
|
|
||||||
ConnectionPurpose purpose;
|
|
||||||
|
|
||||||
/* time connection establishment was started, for timeout */
|
/* time connection establishment was started, for timeout */
|
||||||
TimestampTz connectionStart;
|
TimestampTz connectionStart;
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue