1. 基本结构
入口类型 ConnectionPool
由一个 Hashmap pool
和一个 lru Lru
组成。
/// Connection pool
///
/// [ConnectionPool] holds reusable connections. A reusable connection is released to this pool to
/// be picked up by another user/request.
pub struct ConnectionPool<S> {
// TODO: n-way pools to reduce lock contention
pool: RwLock<HashMap<GroupKey, Arc<PoolNode<PoolConnection<S>>>>>,
lru: Lru<ID, ConnectionMeta>,
}
其中,pool
的 key 是 GroupKey
其实就是 u64;值是 Arc 包裹的 PoolNode
类型。
lru
的 ID
类型是 i32,值 ConnectionMeta
的结构如下:
/// the metadata of a connection
#[derive(Clone, Debug)]
pub struct ConnectionMeta {
/// The group key. All connections under the same key are considered the same for connection reuse.
pub key: GroupKey,
/// The unique ID of a connection.
pub id: ID,
}
可见实际上就是 pool
的 key + lru
的key。下面来区分一下这两个 key:
- GroupKey:
pool
中的 key。当我们需要发送一次 http 请求时,如果有多个现存 tcp 连接都能拿来使用,那么这些连接应当属于同一 host 、有着相同的 upstream socket address。当这些连接被存放到连接池中,它们就应当有相同的GroupKey
,所以GroupKey
的计算方法很可能是对 host、ups socket addr 等值取哈希。 - id:
lru
中的 key ,用于标识一条唯一确定的连接。
因此设计思想比较明确,就是用 pool
来储存所有保活的连接,然后用 lru
来实现最久未使用的汰换,因此 lru
的映射关系是由 lru 自己的 key 映射到 pool
的 key,这样才能在汰换时定位到所需的 pool ,并从中删除不再需要的连接。
下面,我们先从细节出发,看看 pool
和 lru
各自的设计细节,最后再回归到 ConnectionPool
本身,看看最外层接口的设计。
2. lru 的设计
先来看下结构体定义。
pub struct Lru<K, T>
where
K: Send,
T: Send,
{
lru: RwLock<ThreadLocal<RefCell<LruCache<K, Node<T>>>>>,
size: usize,
drain: AtomicBool,
}
pub struct Node<T> {
pub close_notifier: Arc<Notify>,
pub meta: T,
}
设计比较简单,可以看到是 thread local 的封装了 LruCache
这个由同名 crate 提供的结构。
2.1. close_notifier
每个 lru 节点多带了一个 Arc<Notify>
,这是由 tokio 提供的一个同步工具。与另一种常见的通知工具————oneshot 类的 channel 不同,Notify
本身不能携带任何类型的数据。
在 Node
的实现中,包含了一个 notify_close()
方法:
impl<T> Node<T> {
// ... 省略
pub fn notify_close(&self) {
self.close_notifier.notify_one();
}
}
显而易见地,这是用来在 Node
被 lru 算法汰换时,进行一些通知操作。后面我们会看到,在 pingora 连接池中,这个通知会让被汰换的连接进入销毁流程。
2.2. lru 添加操作
添加操作也比较清晰,创建一个 lru node,然后返回其 close notifier。如果出现了汰换,则调用该汰换 node 的 notify_close()
方法,并把这个 node 的数据也一起返回。
pub fn add(&self, key: K, meta: T) -> (Arc<Notify>, Option<T>) {
let node = Node::new(meta);
let notifier = node.close_notifier.clone();
// TODO: check if the key is already in it
(notifier, self.put(key, node))
}
// put a node in and return the meta of the replaced node
pub fn put(&self, key: K, value: Node<T>) -> Option<T> {
if self.drain.load(Relaxed) {
value.notify_close(); // sort of hack to simulate being evicted right away
return None;
}
let lru = self.lru.read(); /* read lock */
let lru_cache = &mut *(lru
.get_or(|| RefCell::new(LruCache::unbounded()))
.borrow_mut());
lru_cache.put(key, value);
if lru_cache.len() > self.size {
match lru_cache.pop_lru() {
Some((_, v)) => {
// TODO: drop the lock here?
v.notify_close();
return Some(v.meta);
}
None => return None,
}
}
None
/* read lock dropped */
}
上面的 drain
是一个实际上没有被连接池使用的预留功能,其作用是驱逐 lru 中所有节点。当开始这个流程时,需要再给原子变量 self.drain
设置为 true,拒绝其他所有 add 操作。
由于代码中竟然没有提供把 self.drain
重新设为 false 的方法,使得 lru 一旦 drain 了就再也没法继续使用了,故这个接口没有实际用途。
3. pool 的设计
前面讲到,pool
是一个 hashmap,其 key 是 u64,value 的类型是 Arc<PoolNode<PoolConnection<S>>>
。
3.1. PoolNode
的设计
PoolNode
由一个热点队列 hot_queue
和一个存放剩余所有非热点连接的 Hashmap connections
组成。
需要注意的是,这个来自于 crate crossbeam_queue 的 hot_queue
本身是 lock-free 的,但当涉及到对队列中的指定 id 进行删除操作时,还是需要引入一个锁,防止同一个 id 被删两次。
/// A pool of exchangeable items
pub struct PoolNode<T> {
connections: Mutex<HashMap<ID, T>>,
// a small lock free queue to avoid lock contention
hot_queue: ArrayQueue<(ID, T)>,
// to avoid race between 2 evictions on the queue
hot_queue_remove_lock: Mutex<()>,
// TODO: store the GroupKey to avoid hash collision?
}
在引入了 hot_queue
以后,尝试取连接时会优先尝试 pop 一个热点连接出来,如果失败则再去从 connections 中取:
/// Get any item from the pool
pub fn get_any(&self) -> Option<(ID, T)> {
let hot_conn = self.hot_queue.pop();
if hot_conn.is_some() {
return hot_conn;
}
let mut connections = self.connections.lock();
// find one connection, any connection will do
let id = match connections.iter().next() {
Some((k, _)) => *k, // OK to copy i32
None => return None,
};
// unwrap is safe since we just found it
let connection = connections.remove(&id).unwrap();
/* NOTE: we don't resize or drop empty connections hashmap
* We may want to do it if they consume too much memory
* maybe we should use trees to save memory */
Some((id, connection))
// connections.lock released here
}
这个设计的有趣之处在于:hot_queue
本身是 lock-free 的(TODO:),因此可以显著减少对 connections
的锁竞争。
向池中放入连接时,自然也是先向 hot_queue
中存,如果把某个连接挤出了热点队列,则把挤出的连接存放到 hashmap 中。
/// Insert an item with the given unique ID into the pool
pub fn insert(&self, id: ID, conn: T) {
if let Err(node) = self.hot_queue.push((id, conn)) {
// hot queue is full
let mut connections = self.connections.lock();
connections.insert(node.0, node.1); // TODO: check dup
}
}
当一个连接由 lru 汰换,需要从 PoolNode
中剔除指定 id 的连接时,从逻辑角度出发,也是先对 hashmap 加锁、遍历,而后对 hot_queue
加锁、遍历。
// This function acquires 2 locks and iterates over the entire hot queue.
// But it should be fine because remove() rarely happens on a busy PoolNode.
/// Remove the item associated with the id from the pool. The item is returned
/// if it is found and removed.
pub fn remove(&self, id: ID) -> Option<T> {
// check the table first as least recent used ones are likely there
let removed = self.connections.lock().remove(&id);
if removed.is_some() {
return removed;
} // lock drops here
let _queue_lock = self.hot_queue_remove_lock.lock();
// check the hot queue, note that the queue can be accessed in parallel by insert and get
let max_len = self.hot_queue.len();
for _ in 0..max_len {
if let Some((conn_id, conn)) = self.hot_queue.pop() {
if conn_id == id {
// this is the item, it is already popped
return Some(conn);
} else {
// not this item, put back to hot queue, but it could also be full
self.insert(conn_id, conn);
}
} else {
// other threads grab all the connections
return None;
}
}
None
// _queue_lock drops here
}
3.2. PoolConnection
的设计
比较简单,主要是多了一个 oneshot 的 channel,用来在某些场合下发出通知。在后文中我们会看到,这个 release()
的实际用途是:当一个连接被从池中取出时,通过此 channel 关闭该连接的闲置超时计时器。
struct PoolConnection<S> {
pub notify_use: oneshot::Sender<bool>,
pub connection: S,
}
impl<S> PoolConnection<S> {
pub fn new(notify_use: oneshot::Sender<bool>, connection: S) -> Self {
PoolConnection {
notify_use,
connection,
}
}
pub fn release(self) -> S {
// notify the idle watcher to release the connection
let _ = self.notify_use.send(true);
// wait for the watcher to release
self.connection
}
}
4. 顶层接口:ConnectionPool
的设计
4.1. 存入连接
逻辑没有太多出乎意料的:
- 先把连接在 lru 中记录下来
- 如果 lru 发生了汰换,则从池中移除对应的连接
- 根据
GroupKey
,获取到对应的PoolNode
,在其中存入连接 - 返回两个值,其一是连接被汰换的 Notify,其二是连接被取出时,用于通知闲置计时器的 channel
/// Release a connection to this pool for reuse /// /// - The returned [`Arc<Notify>`] will notify any listen when the connection is evicted from the pool. /// - The returned [`oneshot::Receiver<bool>`] will notify when the connection is being picked up by [Self::get()]. pub fn put( &self, meta: &ConnectionMeta, connection: S, ) -> (Arc<Notify>, oneshot::Receiver<bool>) { let (notify_close, replaced) = self.lru.add(meta.id, meta.clone()); if let Some(meta) = replaced { self.pop_evicted(&meta); }; let pool_node = self.get_pool_node(meta.key); let (notify_use, watch_use) = oneshot::channel(); let connection = PoolConnection::new(notify_use, connection); pool_node.insert(meta.id, connection); (notify_close, watch_use) }
稍微留意一下获取对应 PoolNode
的逻辑,这里如果无需新增 node,则是用的读锁,可以提高性能。
/* get or create and insert a pool node for the hash key */
fn get_pool_node(&self, key: GroupKey) -> Arc<PoolNode<PoolConnection<S>>> {
{
let pool = self.pool.read();
if let Some(v) = pool.get(&key) {
return (*v).clone();
}
} // read lock released here
{
// write lock section
let mut pool = self.pool.write();
// check again since another task might have already added it
if let Some(v) = pool.get(&key) {
return (*v).clone();
}
let node = Arc::new(PoolNode::new());
let node_ret = node.clone();
pool.insert(key, node); // TODO: check dup
node_ret
}
}
4.2. 取出连接
也没有太多意外,通过 groupkey 找到对应的 node,然后调用前面我们看到过的 get_any()
。
这里可以发现,pingora 连接池的 lru 是只用来汰换闲置连接的。
/// Get a connection from this pool under the same group key
pub fn get(&self, key: &GroupKey) -> Option<S> {
let pool_node = {
let pool = self.pool.read();
match pool.get(key) {
Some(v) => (*v).clone(),
None => return None,
}
}; // read lock released here
if let Some((id, connection)) = pool_node.get_any() {
self.lru.pop(&id); // the notified is not needed
Some(connection.release())
} else {
None
}
}
4.3. idle_poll
在整个 pingora 连接池的顶层接口中,这是唯二的 async 方法。这个异步方法会对三个 Future 进行 tokio::select
,它们分别是:
watch_use
,一个 channel的接受端。前文提到在向连接池存放时会获取一个 channel 的 receiver,而从连接池中取连接时,会调用其release()
方法,通过 channel 的 sender 发去消息。也就是说,如果这个 Future ready 了,说明这条连接被从池中获取、由请求方使用了,因此不再需要执行闲置检查,直接 return 。notify_evicted
,这个 Future ready 代表本连接从 LRU 中汰换了,因此也不再需要进行闲置检查,直接 return。read_result
,会对 connection 调用read_with_timeout
,这个函数进行一次带超时的读操作。设置超时的原因很明显:保活的连接总是需要在一定时间后断开,无论是由于 HTTP keepalive 或 TCP keepalive。如果读到了东西,要么是 EOF,说明对端关闭了连接;要么不符合预期,因为目前此链接是闲置的,不应该再收到对端主动发来的消息,也应该关闭连接。因此,只要读操作的 Future 返回了 ready,就该关闭之。当然,如果读操作超时了,同理也应该关闭。
由这个 async 方法的行为,我们很容易判断,这是一个要被 spawn 出去的方法。
看看在 pingora-core/src/connectors/mod.rs 中对连接池与 idle_poll
的使用:
/// Return the [Stream] to the [TransportConnector] for connection reuse.
///
/// Not all TCP/TLS connections can be reused. It is the caller's responsibility to make sure
/// that protocol over the [Stream] supports connection reuse and the [Stream] itself is ready
/// to be reused.
///
/// If a [Stream] is dropped instead of being returned via this function. it will be closed.
pub fn release_stream(
&self,
mut stream: Stream,
key: u64, // usually peer.reuse_hash()
idle_timeout: Option<std::time::Duration>,
) {
if !test_reusable_stream(&mut stream) {
return;
}
let id = stream.id();
let meta = ConnectionMeta::new(key, id);
debug!("Try to keepalive client session");
let stream = Arc::new(Mutex::new(stream));
let locked_stream = stream.clone().try_lock_owned().unwrap(); // safe as we just created it
let (notify_close, watch_use) = self.connection_pool.put(&meta, stream);
let pool = self.connection_pool.clone(); //clone the arc
let rt = pingora_runtime::current_handle();
rt.spawn(async move {
pool.idle_poll(locked_stream, &meta, idle_timeout, notify_close, watch_use)
.await;
});
}
4.4. idle_timeout
这是连接池顶层接口中另一个 async 方法,同样也是用于连接池中连接的闲置管理。与刚刚的 idle_poll
方法的区别在于,这个方法不会去主动检查连接是否破裂,也就是说,不会尝试在连接上进行读操作。相反,这个方法接受一个额外的 channel receiver 作为参数,并由这个管道的通知来感知连接是否断开。
先看代码,注意其中的 notify_closed
:
/// Passively wait to close the connection after the timeout
///
/// If this connection is not being picked up or evicted before the timeout is reach, this
/// function will remove it from the pool and close the connection.
pub async fn idle_timeout(
&self,
meta: &ConnectionMeta,
timeout: Duration,
notify_evicted: Arc<Notify>,
mut notify_closed: watch::Receiver<bool>,
watch_use: oneshot::Receiver<bool>,
) {
tokio::select! {
biased;
_ = watch_use => {
debug!("idle connection is being picked up");
},
_ = notify_evicted.notified() => {
debug!("idle connection is being evicted");
// TODO: gracefully close the connection?
}
_ = notify_closed.changed() => {
// assume always changed from false to true
debug!("idle connection is being closed");
self.pop_closed(meta);
}
_ = sleep(timeout) => {
debug!("idle connection is being evicted");
self.pop_closed(meta);
}
};
}
那既然我们已经有了 idle_poll
, 为什么还需要呢这样一个不主动读 eof 的 idle_timeout
呢?这就需要我们看一下到底是什么地方在使用此方法了。代码在 pingora-core/src/connectors/http/v2.rs
/// Release a finished h2 stream.
///
/// This function will terminate the [Http2Session]. The corresponding h2 connection will now
/// have one more free stream to use.
///
/// The h2 connection will be closed after `idle_timeout` if it has no active streams.
pub fn release_http_session<P: Peer + Send + Sync + 'static>(
&self,
session: Http2Session,
peer: &P,
idle_timeout: Option<Duration>,
) {
let id = session.conn.id();
let reuse_hash = peer.reuse_hash();
// get a ref to the connection, which we might need below, before dropping the h2
let conn = session.conn();
// The lock here is to make sure that in_use_pool.insert() below cannot be called after
// in_use_pool.release(), which would have put the conn entry in both pools.
// It also makes sure that only one conn will trigger the conn.is_idle() condition, which
// avoids putting the same conn into the idle_pool more than once.
let locked = conn.0.release_lock.lock_arc();
// this drop() will both drop the actual stream and call the conn.release_stream()
drop(session);
// find and remove the conn stored in in_use_pool so that it could be put in the idle pool
// if necessary
let conn = self.in_use_pool.release(reuse_hash, id).unwrap_or(conn);
if conn.is_closed() {
// Already dead h2 connection
return;
}
if conn.is_idle() {
drop(locked);
let meta = ConnectionMeta {
key: reuse_hash,
id,
};
let closed = conn.0.closed.clone();
let (notify_evicted, watch_use) = self.idle_pool.put(&meta, conn);
if let Some(to) = idle_timeout {
let pool = self.idle_pool.clone(); //clone the arc
let rt = pingora_runtime::current_handle();
rt.spawn(async move {
pool.idle_timeout(&meta, to, notify_evicted, closed, watch_use)
.await;
});
}
} else {
self.in_use_pool.insert(reuse_hash, conn);
drop(locked);
}
}
这牵扯到 pingora http2 的实现了,会是另一个长篇大论。我们后面单开一篇文章聊聊。
5. 其他
5.1. 连接唯一 id 的实现
pingora 是如何用简单的方式来为每个连接产生全局唯一的 id 呢?
首先,设计了一个 UniqueID
trait,原型:
/// Define how a given session/connection identifies itself.
pub trait UniqueID {
/// The ID returned should be unique among all existing connections of the same type.
/// But ID can be recycled after a connection is shutdown.
fn id(&self) -> i32;
}
然后,理所当然地,使用 fd 的端口号。。。
impl UniqueID for Stream {
fn id(&self) -> i32 {
self.as_raw_fd()
}
}
impl UniqueID for HttpSession {
fn id(&self) -> i32 {
self.underlying_stream.id()
}
}