pingora 笔记 1:连接池

1. 基本结构

入口类型 ConnectionPool 由一个 Hashmap pool 和一个 lru Lru 组成。

/// Connection pool
///
/// [ConnectionPool] holds reusable connections. A reusable connection is released to this pool to
/// be picked up by another user/request.
pub struct ConnectionPool<S> {
    // TODO: n-way pools to reduce lock contention
    pool: RwLock<HashMap<GroupKey, Arc<PoolNode<PoolConnection<S>>>>>,
    lru: Lru<ID, ConnectionMeta>,
}

其中,pool 的 key 是 GroupKey 其实就是 u64;值是 Arc 包裹的 PoolNode 类型。
lruID 类型是 i32,值 ConnectionMeta 的结构如下:

/// the metadata of a connection
#[derive(Clone, Debug)]
pub struct ConnectionMeta {
    /// The group key. All connections under the same key are considered the same for connection reuse.
    pub key: GroupKey,
    /// The unique ID of a connection.
    pub id: ID,
}

可见实际上就是 pool 的 key + lru 的key。下面来区分一下这两个 key:

  • GroupKey: pool 中的 key。当我们需要发送一次 http 请求时,如果有多个现存 tcp 连接都能拿来使用,那么这些连接应当属于同一 host 、有着相同的 upstream socket address。当这些连接被存放到连接池中,它们就应当有相同的 GroupKey,所以 GroupKey 的计算方法很可能是对 host、ups socket addr 等值取哈希。
  • id: lru 中的 key ,用于标识一条唯一确定的连接。

因此设计思想比较明确,就是用 pool 来储存所有保活的连接,然后用 lru 来实现最久未使用的汰换,因此 lru 的映射关系是由 lru 自己的 key 映射到 pool 的 key,这样才能在汰换时定位到所需的 pool ,并从中删除不再需要的连接。

下面,我们先从细节出发,看看 poollru 各自的设计细节,最后再回归到 ConnectionPool 本身,看看最外层接口的设计。

2. lru 的设计

先来看下结构体定义。

pub struct Lru<K, T>
where
    K: Send,
    T: Send,
{
    lru: RwLock<ThreadLocal<RefCell<LruCache<K, Node<T>>>>>,
    size: usize,
    drain: AtomicBool,
}

pub struct Node<T> {
    pub close_notifier: Arc<Notify>,
    pub meta: T,
}

设计比较简单,可以看到是 thread local 的封装了 LruCache 这个由同名 crate 提供的结构。

2.1. close_notifier

每个 lru 节点多带了一个 Arc<Notify>,这是由 tokio 提供的一个同步工具。与另一种常见的通知工具————oneshot 类的 channel 不同,Notify 本身不能携带任何类型的数据。

Node 的实现中,包含了一个 notify_close() 方法:

impl<T> Node<T> {
    // ... 省略
    pub fn notify_close(&self) {
        self.close_notifier.notify_one();
    }
}

显而易见地,这是用来在 Node 被 lru 算法汰换时,进行一些通知操作。后面我们会看到,在 pingora 连接池中,这个通知会让被汰换的连接进入销毁流程。

2.2. lru 添加操作

添加操作也比较清晰,创建一个 lru node,然后返回其 close notifier。如果出现了汰换,则调用该汰换 node 的 notify_close() 方法,并把这个 node 的数据也一起返回。

pub fn add(&self, key: K, meta: T) -> (Arc<Notify>, Option<T>) {
    let node = Node::new(meta);
    let notifier = node.close_notifier.clone();
    // TODO: check if the key is already in it
    (notifier, self.put(key, node))
}

// put a node in and return the meta of the replaced node
pub fn put(&self, key: K, value: Node<T>) -> Option<T> {
    if self.drain.load(Relaxed) {
        value.notify_close(); // sort of hack to simulate being evicted right away
        return None;
    }
    let lru = self.lru.read(); /* read lock */
    let lru_cache = &mut *(lru
        .get_or(|| RefCell::new(LruCache::unbounded()))
        .borrow_mut());
    lru_cache.put(key, value);
    if lru_cache.len() > self.size {
        match lru_cache.pop_lru() {
            Some((_, v)) => {
                // TODO: drop the lock here?
                v.notify_close();
                return Some(v.meta);
            }
            None => return None,
        }
    }
    None
    /* read lock dropped */
}

上面的 drain 是一个实际上没有被连接池使用的预留功能,其作用是驱逐 lru 中所有节点。当开始这个流程时,需要再给原子变量 self.drain 设置为 true,拒绝其他所有 add 操作。

由于代码中竟然没有提供把 self.drain 重新设为 false 的方法,使得 lru 一旦 drain 了就再也没法继续使用了,故这个接口没有实际用途。

3. pool 的设计

前面讲到,pool 是一个 hashmap,其 key 是 u64,value 的类型是 Arc<PoolNode<PoolConnection<S>>>

3.1. PoolNode 的设计

PoolNode 由一个热点队列 hot_queue 和一个存放剩余所有非热点连接的 Hashmap connections 组成。
需要注意的是,这个来自于 crate crossbeam_queue 的 hot_queue 本身是 lock-free 的,但当涉及到对队列中的指定 id 进行删除操作时,还是需要引入一个锁,防止同一个 id 被删两次。

/// A pool of exchangeable items
pub struct PoolNode<T> {
    connections: Mutex<HashMap<ID, T>>,
    // a small lock free queue to avoid lock contention
    hot_queue: ArrayQueue<(ID, T)>,
    // to avoid race between 2 evictions on the queue
    hot_queue_remove_lock: Mutex<()>,
    // TODO: store the GroupKey to avoid hash collision?
}

在引入了 hot_queue 以后,尝试取连接时会优先尝试 pop 一个热点连接出来,如果失败则再去从 connections 中取:

/// Get any item from the pool
pub fn get_any(&self) -> Option<(ID, T)> {
    let hot_conn = self.hot_queue.pop();
    if hot_conn.is_some() {
        return hot_conn;
    }
    let mut connections = self.connections.lock();
    // find one connection, any connection will do
    let id = match connections.iter().next() {
        Some((k, _)) => *k, // OK to copy i32
        None => return None,
    };
    // unwrap is safe since we just found it
    let connection = connections.remove(&id).unwrap();
    /* NOTE: we don't resize or drop empty connections hashmap
        * We may want to do it if they consume too much memory
        * maybe we should use trees to save memory */
    Some((id, connection))
    // connections.lock released here
}

这个设计的有趣之处在于:hot_queue 本身是 lock-free 的(TODO:),因此可以显著减少对 connections 的锁竞争。

向池中放入连接时,自然也是先向 hot_queue 中存,如果把某个连接挤出了热点队列,则把挤出的连接存放到 hashmap 中。

/// Insert an item with the given unique ID into the pool
pub fn insert(&self, id: ID, conn: T) {
    if let Err(node) = self.hot_queue.push((id, conn)) {
        // hot queue is full
        let mut connections = self.connections.lock();
        connections.insert(node.0, node.1); // TODO: check dup
    }
}

当一个连接由 lru 汰换,需要从 PoolNode 中剔除指定 id 的连接时,从逻辑角度出发,也是先对 hashmap 加锁、遍历,而后对 hot_queue 加锁、遍历。

// This function acquires 2 locks and iterates over the entire hot queue.
// But it should be fine because remove() rarely happens on a busy PoolNode.
/// Remove the item associated with the id from the pool. The item is returned
/// if it is found and removed.
pub fn remove(&self, id: ID) -> Option<T> {
    // check the table first as least recent used ones are likely there
    let removed = self.connections.lock().remove(&id);
    if removed.is_some() {
        return removed;
    } // lock drops here

    let _queue_lock = self.hot_queue_remove_lock.lock();
    // check the hot queue, note that the queue can be accessed in parallel by insert and get
    let max_len = self.hot_queue.len();
    for _ in 0..max_len {
        if let Some((conn_id, conn)) = self.hot_queue.pop() {
            if conn_id == id {
                // this is the item, it is already popped
                return Some(conn);
            } else {
                // not this item, put back to hot queue, but it could also be full
                self.insert(conn_id, conn);
            }
        } else {
            // other threads grab all the connections
            return None;
        }
    }
    None
    // _queue_lock drops here
}

3.2. PoolConnection 的设计

比较简单,主要是多了一个 oneshot 的 channel,用来在某些场合下发出通知。在后文中我们会看到,这个 release() 的实际用途是:当一个连接被从池中取出时,通过此 channel 关闭该连接的闲置超时计时器。

struct PoolConnection<S> {
    pub notify_use: oneshot::Sender<bool>,
    pub connection: S,
}

impl<S> PoolConnection<S> {
    pub fn new(notify_use: oneshot::Sender<bool>, connection: S) -> Self {
        PoolConnection {
            notify_use,
            connection,
        }
    }

    pub fn release(self) -> S {
        // notify the idle watcher to release the connection
        let _ = self.notify_use.send(true);
        // wait for the watcher to release
        self.connection
    }
}

4. 顶层接口:ConnectionPool 的设计

4.1. 存入连接

逻辑没有太多出乎意料的:

  1. 先把连接在 lru 中记录下来
  2. 如果 lru 发生了汰换,则从池中移除对应的连接
  3. 根据 GroupKey ,获取到对应的 PoolNode,在其中存入连接
  4. 返回两个值,其一是连接被汰换的 Notify,其二是连接被取出时,用于通知闲置计时器的 channel
    /// Release a connection to this pool for reuse
    ///
    /// - The returned [`Arc<Notify>`] will notify any listen when the connection is evicted from the pool.
    /// - The returned [`oneshot::Receiver<bool>`] will notify when the connection is being picked up by [Self::get()].
    pub fn put(
    &self,
    meta: &ConnectionMeta,
    connection: S,
    ) -> (Arc<Notify>, oneshot::Receiver<bool>) {
    let (notify_close, replaced) = self.lru.add(meta.id, meta.clone());
    if let Some(meta) = replaced {
        self.pop_evicted(&meta);
    };
    let pool_node = self.get_pool_node(meta.key);
    let (notify_use, watch_use) = oneshot::channel();
    let connection = PoolConnection::new(notify_use, connection);
    pool_node.insert(meta.id, connection);
    (notify_close, watch_use)
    }

稍微留意一下获取对应 PoolNode 的逻辑,这里如果无需新增 node,则是用的读锁,可以提高性能。

/* get or create and insert a pool node for the hash key */
fn get_pool_node(&self, key: GroupKey) -> Arc<PoolNode<PoolConnection<S>>> {
    {
        let pool = self.pool.read();
        if let Some(v) = pool.get(&key) {
            return (*v).clone();
        }
    } // read lock released here

    {
        // write lock section
        let mut pool = self.pool.write();
        // check again since another task might have already added it
        if let Some(v) = pool.get(&key) {
            return (*v).clone();
        }
        let node = Arc::new(PoolNode::new());
        let node_ret = node.clone();
        pool.insert(key, node); // TODO: check dup
        node_ret
    }
}

4.2. 取出连接

也没有太多意外,通过 groupkey 找到对应的 node,然后调用前面我们看到过的 get_any()
这里可以发现,pingora 连接池的 lru 是只用来汰换闲置连接的。

/// Get a connection from this pool under the same group key
pub fn get(&self, key: &GroupKey) -> Option<S> {
    let pool_node = {
        let pool = self.pool.read();
        match pool.get(key) {
            Some(v) => (*v).clone(),
            None => return None,
        }
    }; // read lock released here

    if let Some((id, connection)) = pool_node.get_any() {
        self.lru.pop(&id); // the notified is not needed
        Some(connection.release())
    } else {
        None
    }
}

4.3. idle_poll

在整个 pingora 连接池的顶层接口中,这是唯二的 async 方法。这个异步方法会对三个 Future 进行 tokio::select,它们分别是:

  1. watch_use,一个 channel的接受端。前文提到在向连接池存放时会获取一个 channel 的 receiver,而从连接池中取连接时,会调用其 release() 方法,通过 channel 的 sender 发去消息。也就是说,如果这个 Future ready 了,说明这条连接被从池中获取、由请求方使用了,因此不再需要执行闲置检查,直接 return 。
  2. notify_evicted,这个 Future ready 代表本连接从 LRU 中汰换了,因此也不再需要进行闲置检查,直接 return。
  3. read_result,会对 connection 调用 read_with_timeout ,这个函数进行一次带超时的读操作。设置超时的原因很明显:保活的连接总是需要在一定时间后断开,无论是由于 HTTP keepalive 或 TCP keepalive。如果读到了东西,要么是 EOF,说明对端关闭了连接;要么不符合预期,因为目前此链接是闲置的,不应该再收到对端主动发来的消息,也应该关闭连接。因此,只要读操作的 Future 返回了 ready,就该关闭之。当然,如果读操作超时了,同理也应该关闭。

由这个 async 方法的行为,我们很容易判断,这是一个要被 spawn 出去的方法。

看看在 pingora-core/src/connectors/mod.rs 中对连接池与 idle_poll 的使用:

/// Return the [Stream] to the [TransportConnector] for connection reuse.
///
/// Not all TCP/TLS connections can be reused. It is the caller's responsibility to make sure
/// that protocol over the [Stream] supports connection reuse and the [Stream] itself is ready
/// to be reused.
///
/// If a [Stream] is dropped instead of being returned via this function. it will be closed.
pub fn release_stream(
    &self,
    mut stream: Stream,
    key: u64, // usually peer.reuse_hash()
    idle_timeout: Option<std::time::Duration>,
) {
    if !test_reusable_stream(&mut stream) {
        return;
    }
    let id = stream.id();
    let meta = ConnectionMeta::new(key, id);
    debug!("Try to keepalive client session");
    let stream = Arc::new(Mutex::new(stream));
    let locked_stream = stream.clone().try_lock_owned().unwrap(); // safe as we just created it
    let (notify_close, watch_use) = self.connection_pool.put(&meta, stream);
    let pool = self.connection_pool.clone(); //clone the arc
    let rt = pingora_runtime::current_handle();
    rt.spawn(async move {
        pool.idle_poll(locked_stream, &meta, idle_timeout, notify_close, watch_use)
            .await;
    });
}

4.4. idle_timeout

这是连接池顶层接口中另一个 async 方法,同样也是用于连接池中连接的闲置管理。与刚刚的 idle_poll 方法的区别在于,这个方法不会去主动检查连接是否破裂,也就是说,不会尝试在连接上进行读操作。相反,这个方法接受一个额外的 channel receiver 作为参数,并由这个管道的通知来感知连接是否断开。

先看代码,注意其中的 notify_closed

/// Passively wait to close the connection after the timeout
///
/// If this connection is not being picked up or evicted before the timeout is reach, this
/// function will remove it from the pool and close the connection.
pub async fn idle_timeout(
    &self,
    meta: &ConnectionMeta,
    timeout: Duration,
    notify_evicted: Arc<Notify>,
    mut notify_closed: watch::Receiver<bool>,
    watch_use: oneshot::Receiver<bool>,
) {
    tokio::select! {
        biased;
        _ = watch_use => {
            debug!("idle connection is being picked up");
        },
        _ = notify_evicted.notified() => {
            debug!("idle connection is being evicted");
            // TODO: gracefully close the connection?
        }
        _ = notify_closed.changed() => {
            // assume always changed from false to true
            debug!("idle connection is being closed");
            self.pop_closed(meta);
        }
        _ = sleep(timeout) => {
            debug!("idle connection is being evicted");
            self.pop_closed(meta);
        }
    };
}

那既然我们已经有了 idle_poll, 为什么还需要呢这样一个不主动读 eof 的 idle_timeout 呢?这就需要我们看一下到底是什么地方在使用此方法了。代码在 pingora-core/src/connectors/http/v2.rs

/// Release a finished h2 stream.
///
/// This function will terminate the [Http2Session]. The corresponding h2 connection will now
/// have one more free stream to use.
///
/// The h2 connection will be closed after `idle_timeout` if it has no active streams.
pub fn release_http_session<P: Peer + Send + Sync + 'static>(
    &self,
    session: Http2Session,
    peer: &P,
    idle_timeout: Option<Duration>,
) {
    let id = session.conn.id();
    let reuse_hash = peer.reuse_hash();
    // get a ref to the connection, which we might need below, before dropping the h2
    let conn = session.conn();

    // The lock here is to make sure that in_use_pool.insert() below cannot be called after
    // in_use_pool.release(), which would have put the conn entry in both pools.
    // It also makes sure that only one conn will trigger the conn.is_idle() condition, which
    // avoids putting the same conn into the idle_pool more than once.
    let locked = conn.0.release_lock.lock_arc();
    // this drop() will both drop the actual stream and call the conn.release_stream()
    drop(session);
    // find and remove the conn stored in in_use_pool so that it could be put in the idle pool
    // if necessary
    let conn = self.in_use_pool.release(reuse_hash, id).unwrap_or(conn);
    if conn.is_closed() {
        // Already dead h2 connection
        return;
    }
    if conn.is_idle() {
        drop(locked);
        let meta = ConnectionMeta {
            key: reuse_hash,
            id,
        };
        let closed = conn.0.closed.clone();
        let (notify_evicted, watch_use) = self.idle_pool.put(&meta, conn);
        if let Some(to) = idle_timeout {
            let pool = self.idle_pool.clone(); //clone the arc
            let rt = pingora_runtime::current_handle();
            rt.spawn(async move {
                pool.idle_timeout(&meta, to, notify_evicted, closed, watch_use)
                    .await;
            });
        }
    } else {
        self.in_use_pool.insert(reuse_hash, conn);
        drop(locked);
    }
}

这牵扯到 pingora http2 的实现了,会是另一个长篇大论。我们后面单开一篇文章聊聊。

5. 其他

5.1. 连接唯一 id 的实现

pingora 是如何用简单的方式来为每个连接产生全局唯一的 id 呢?

首先,设计了一个 UniqueID trait,原型:

/// Define how a given session/connection identifies itself.
pub trait UniqueID {
    /// The ID returned should be unique among all existing connections of the same type.
    /// But ID can be recycled after a connection is shutdown.
    fn id(&self) -> i32;
}

然后,理所当然地,使用 fd 的端口号。。。

impl UniqueID for Stream {
    fn id(&self) -> i32 {
        self.as_raw_fd()
    }
}

impl UniqueID for HttpSession {
    fn id(&self) -> i32 {
        self.underlying_stream.id()
    }
}
知识共享许可协议
本作品采用知识共享署名-非商业性使用-相同方式共享 4.0 国际许可协议进行许可。
暂无评论

发送评论 编辑评论


				
|´・ω・)ノ
ヾ(≧∇≦*)ゝ
(☆ω☆)
(╯‵□′)╯︵┴─┴
 ̄﹃ ̄
(/ω\)
∠( ᐛ 」∠)_
(๑•̀ㅁ•́ฅ)
→_→
୧(๑•̀⌄•́๑)૭
٩(ˊᗜˋ*)و
(ノ°ο°)ノ
(´இ皿இ`)
⌇●﹏●⌇
(ฅ´ω`ฅ)
(╯°A°)╯︵○○○
φ( ̄∇ ̄o)
ヾ(´・ ・`。)ノ"
( ง ᵒ̌皿ᵒ̌)ง⁼³₌₃
(ó﹏ò。)
Σ(っ °Д °;)っ
( ,,´・ω・)ノ"(´っω・`。)
╮(╯▽╰)╭
o(*////▽////*)q
>﹏<
( ๑´•ω•) "(ㆆᴗㆆ)
😂
😀
😅
😊
🙂
🙃
😌
😍
😘
😜
😝
😏
😒
🙄
😳
😡
😔
😫
😱
😭
💩
👻
🙌
🖕
👍
👫
👬
👭
🌚
🌝
🙈
💊
😶
🙏
🍦
🍉
😣
Source: github.com/k4yt3x/flowerhd
颜文字
Emoji
小恐龙
花!
上一篇
下一篇