{"id":364,"date":"2024-07-06T23:09:36","date_gmt":"2024-07-06T15:09:36","guid":{"rendered":"https:\/\/cococat.top\/?p=364"},"modified":"2024-07-06T23:09:36","modified_gmt":"2024-07-06T15:09:36","slug":"pingora-connection-pool","status":"publish","type":"post","link":"https:\/\/cococat.top\/index.php\/2024\/07\/06\/pingora-connection-pool\/","title":{"rendered":"pingora \u7b14\u8bb0 1\uff1a\u8fde\u63a5\u6c60"},"content":{"rendered":"<h2>1. \u57fa\u672c\u7ed3\u6784<\/h2>\n<p>\u5165\u53e3\u7c7b\u578b <code>ConnectionPool<\/code> \u7531\u4e00\u4e2a Hashmap <code>pool<\/code> \u548c\u4e00\u4e2a lru <code>Lru<\/code> \u7ec4\u6210\u3002<\/p>\n<pre><code class=\"language-rust\">\/\/\/ Connection pool\n\/\/\/\n\/\/\/ [ConnectionPool] holds reusable connections. A reusable connection is released to this pool to\n\/\/\/ be picked up by another user\/request.\npub struct ConnectionPool&lt;S&gt; {\n    \/\/ TODO: n-way pools to reduce lock contention\n    pool: RwLock&lt;HashMap&lt;GroupKey, Arc&lt;PoolNode&lt;PoolConnection&lt;S&gt;&gt;&gt;&gt;&gt;,\n    lru: Lru&lt;ID, ConnectionMeta&gt;,\n}<\/code><\/pre>\n<p>\u5176\u4e2d\uff0c<code>pool<\/code> \u7684 key \u662f <code>GroupKey<\/code> \u5176\u5b9e\u5c31\u662f u64\uff1b\u503c\u662f Arc \u5305\u88f9\u7684 <code>PoolNode<\/code> \u7c7b\u578b\u3002<br \/>\n<code>lru<\/code> \u7684 <code>ID<\/code> \u7c7b\u578b\u662f i32\uff0c\u503c <code>ConnectionMeta<\/code> \u7684\u7ed3\u6784\u5982\u4e0b\uff1a<\/p>\n<pre><code class=\"language-rust\">\/\/\/ the metadata of a connection\n#[derive(Clone, Debug)]\npub struct ConnectionMeta {\n    \/\/\/ The group key. All connections under the same key are considered the same for connection reuse.\n    pub key: GroupKey,\n    \/\/\/ The unique ID of a connection.\n    pub id: ID,\n}<\/code><\/pre>\n<p>\u53ef\u89c1\u5b9e\u9645\u4e0a\u5c31\u662f <code>pool<\/code> \u7684 key + <code>lru<\/code> \u7684key\u3002\u4e0b\u9762\u6765\u533a\u5206\u4e00\u4e0b\u8fd9\u4e24\u4e2a key\uff1a<\/p>\n<ul>\n<li>GroupKey: <code>pool<\/code> \u4e2d\u7684 key\u3002\u5f53\u6211\u4eec\u9700\u8981\u53d1\u9001\u4e00\u6b21 http \u8bf7\u6c42\u65f6\uff0c\u5982\u679c\u6709\u591a\u4e2a\u73b0\u5b58 tcp \u8fde\u63a5\u90fd\u80fd\u62ff\u6765\u4f7f\u7528\uff0c\u90a3\u4e48\u8fd9\u4e9b\u8fde\u63a5\u5e94\u5f53\u5c5e\u4e8e\u540c\u4e00 host \u3001\u6709\u7740\u76f8\u540c\u7684 upstream socket address\u3002\u5f53\u8fd9\u4e9b\u8fde\u63a5\u88ab\u5b58\u653e\u5230\u8fde\u63a5\u6c60\u4e2d\uff0c\u5b83\u4eec\u5c31\u5e94\u5f53\u6709\u76f8\u540c\u7684 <code>GroupKey<\/code>\uff0c\u6240\u4ee5 <code>GroupKey<\/code> \u7684\u8ba1\u7b97\u65b9\u6cd5\u5f88\u53ef\u80fd\u662f\u5bf9 host\u3001ups socket addr \u7b49\u503c\u53d6\u54c8\u5e0c\u3002<\/li>\n<li>id: <code>lru<\/code> \u4e2d\u7684 key \uff0c\u7528\u4e8e\u6807\u8bc6\u4e00\u6761\u552f\u4e00\u786e\u5b9a\u7684\u8fde\u63a5\u3002<\/li>\n<\/ul>\n<p>\u56e0\u6b64\u8bbe\u8ba1\u601d\u60f3\u6bd4\u8f83\u660e\u786e\uff0c\u5c31\u662f\u7528 <code>pool<\/code> \u6765\u50a8\u5b58\u6240\u6709\u4fdd\u6d3b\u7684\u8fde\u63a5\uff0c\u7136\u540e\u7528 <code>lru<\/code> \u6765\u5b9e\u73b0\u6700\u4e45\u672a\u4f7f\u7528\u7684\u6c70\u6362\uff0c\u56e0\u6b64 <code>lru<\/code> \u7684\u6620\u5c04\u5173\u7cfb\u662f\u7531 lru \u81ea\u5df1\u7684 key \u6620\u5c04\u5230 <code>pool<\/code> \u7684 key\uff0c\u8fd9\u6837\u624d\u80fd\u5728\u6c70\u6362\u65f6\u5b9a\u4f4d\u5230\u6240\u9700\u7684 pool \uff0c\u5e76\u4ece\u4e2d\u5220\u9664\u4e0d\u518d\u9700\u8981\u7684\u8fde\u63a5\u3002<\/p>\n<p>\u4e0b\u9762\uff0c\u6211\u4eec\u5148\u4ece\u7ec6\u8282\u51fa\u53d1\uff0c\u770b\u770b <code>pool<\/code> \u548c <code>lru<\/code> \u5404\u81ea\u7684\u8bbe\u8ba1\u7ec6\u8282\uff0c\u6700\u540e\u518d\u56de\u5f52\u5230 <code>ConnectionPool<\/code> \u672c\u8eab\uff0c\u770b\u770b\u6700\u5916\u5c42\u63a5\u53e3\u7684\u8bbe\u8ba1\u3002<\/p>\n<h2>2. lru \u7684\u8bbe\u8ba1<\/h2>\n<p>\u5148\u6765\u770b\u4e0b\u7ed3\u6784\u4f53\u5b9a\u4e49\u3002<\/p>\n<pre><code class=\"language-rust\">pub struct Lru&lt;K, T&gt;\nwhere\n    K: Send,\n    T: Send,\n{\n    lru: RwLock&lt;ThreadLocal&lt;RefCell&lt;LruCache&lt;K, Node&lt;T&gt;&gt;&gt;&gt;&gt;,\n    size: usize,\n    drain: AtomicBool,\n}\n\npub struct Node&lt;T&gt; {\n    pub close_notifier: Arc&lt;Notify&gt;,\n    pub meta: T,\n}<\/code><\/pre>\n<p>\u8bbe\u8ba1\u6bd4\u8f83\u7b80\u5355\uff0c\u53ef\u4ee5\u770b\u5230\u662f thread local \u7684\u5c01\u88c5\u4e86 <code>LruCache<\/code> \u8fd9\u4e2a\u7531\u540c\u540d crate \u63d0\u4f9b\u7684\u7ed3\u6784\u3002<\/p>\n<h3>2.1. close_notifier<\/h3>\n<p>\u6bcf\u4e2a lru \u8282\u70b9\u591a\u5e26\u4e86\u4e00\u4e2a <code>Arc&lt;Notify&gt;<\/code>\uff0c\u8fd9\u662f\u7531 tokio \u63d0\u4f9b\u7684\u4e00\u4e2a\u540c\u6b65\u5de5\u5177\u3002\u4e0e\u53e6\u4e00\u79cd\u5e38\u89c1\u7684\u901a\u77e5\u5de5\u5177\u2014\u2014\u2014\u2014oneshot \u7c7b\u7684 channel \u4e0d\u540c\uff0c<code>Notify<\/code> \u672c\u8eab\u4e0d\u80fd\u643a\u5e26\u4efb\u4f55\u7c7b\u578b\u7684\u6570\u636e\u3002<\/p>\n<p>\u5728 <code>Node<\/code> \u7684\u5b9e\u73b0\u4e2d\uff0c\u5305\u542b\u4e86\u4e00\u4e2a <code>notify_close()<\/code> \u65b9\u6cd5\uff1a<\/p>\n<pre><code class=\"language-rust\">impl&lt;T&gt; Node&lt;T&gt; {\n    \/\/ ... \u7701\u7565\n    pub fn notify_close(&amp;self) {\n        self.close_notifier.notify_one();\n    }\n}<\/code><\/pre>\n<p>\u663e\u800c\u6613\u89c1\u5730\uff0c\u8fd9\u662f\u7528\u6765\u5728 <code>Node<\/code> \u88ab lru \u7b97\u6cd5\u6c70\u6362\u65f6\uff0c\u8fdb\u884c\u4e00\u4e9b\u901a\u77e5\u64cd\u4f5c\u3002\u540e\u9762\u6211\u4eec\u4f1a\u770b\u5230\uff0c\u5728 pingora \u8fde\u63a5\u6c60\u4e2d\uff0c\u8fd9\u4e2a\u901a\u77e5\u4f1a\u8ba9\u88ab\u6c70\u6362\u7684\u8fde\u63a5\u8fdb\u5165\u9500\u6bc1\u6d41\u7a0b\u3002<\/p>\n<h3>2.2. lru \u6dfb\u52a0\u64cd\u4f5c<\/h3>\n<p>\u6dfb\u52a0\u64cd\u4f5c\u4e5f\u6bd4\u8f83\u6e05\u6670\uff0c\u521b\u5efa\u4e00\u4e2a lru node\uff0c\u7136\u540e\u8fd4\u56de\u5176 close notifier\u3002\u5982\u679c\u51fa\u73b0\u4e86\u6c70\u6362\uff0c\u5219\u8c03\u7528\u8be5\u6c70\u6362 node \u7684 <code>notify_close()<\/code> \u65b9\u6cd5\uff0c\u5e76\u628a\u8fd9\u4e2a node \u7684\u6570\u636e\u4e5f\u4e00\u8d77\u8fd4\u56de\u3002<\/p>\n<pre><code class=\"language-rust\">pub fn add(&amp;self, key: K, meta: T) -&gt; (Arc&lt;Notify&gt;, Option&lt;T&gt;) {\n    let node = Node::new(meta);\n    let notifier = node.close_notifier.clone();\n    \/\/ TODO: check if the key is already in it\n    (notifier, self.put(key, node))\n}\n\n\/\/ put a node in and return the meta of the replaced node\npub fn put(&amp;self, key: K, value: Node&lt;T&gt;) -&gt; Option&lt;T&gt; {\n    if self.drain.load(Relaxed) {\n        value.notify_close(); \/\/ sort of hack to simulate being evicted right away\n        return None;\n    }\n    let lru = self.lru.read(); \/* read lock *\/\n    let lru_cache = &amp;mut *(lru\n        .get_or(|| RefCell::new(LruCache::unbounded()))\n        .borrow_mut());\n    lru_cache.put(key, value);\n    if lru_cache.len() &gt; self.size {\n        match lru_cache.pop_lru() {\n            Some((_, v)) =&gt; {\n                \/\/ TODO: drop the lock here?\n                v.notify_close();\n                return Some(v.meta);\n            }\n            None =&gt; return None,\n        }\n    }\n    None\n    \/* read lock dropped *\/\n}<\/code><\/pre>\n<p>\u4e0a\u9762\u7684 <code>drain<\/code> \u662f\u4e00\u4e2a\u5b9e\u9645\u4e0a\u6ca1\u6709\u88ab\u8fde\u63a5\u6c60\u4f7f\u7528\u7684\u9884\u7559\u529f\u80fd\uff0c\u5176\u4f5c\u7528\u662f\u9a71\u9010 lru \u4e2d\u6240\u6709\u8282\u70b9\u3002\u5f53\u5f00\u59cb\u8fd9\u4e2a\u6d41\u7a0b\u65f6\uff0c\u9700\u8981\u518d\u7ed9\u539f\u5b50\u53d8\u91cf <code>self.drain<\/code> \u8bbe\u7f6e\u4e3a true\uff0c\u62d2\u7edd\u5176\u4ed6\u6240\u6709 add \u64cd\u4f5c\u3002<\/p>\n<p>\u7531\u4e8e\u4ee3\u7801\u4e2d\u7adf\u7136\u6ca1\u6709\u63d0\u4f9b\u628a <code>self.drain<\/code> \u91cd\u65b0\u8bbe\u4e3a false \u7684\u65b9\u6cd5\uff0c\u4f7f\u5f97 lru \u4e00\u65e6 drain \u4e86\u5c31\u518d\u4e5f\u6ca1\u6cd5\u7ee7\u7eed\u4f7f\u7528\u4e86\uff0c\u6545\u8fd9\u4e2a\u63a5\u53e3\u6ca1\u6709\u5b9e\u9645\u7528\u9014\u3002<\/p>\n<h2>3. pool \u7684\u8bbe\u8ba1<\/h2>\n<p>\u524d\u9762\u8bb2\u5230\uff0c<code>pool<\/code> \u662f\u4e00\u4e2a hashmap\uff0c\u5176 key \u662f u64\uff0cvalue \u7684\u7c7b\u578b\u662f <code>Arc&lt;PoolNode&lt;PoolConnection&lt;S&gt;&gt;&gt;<\/code> \u3002<\/p>\n<h3>3.1. <code>PoolNode<\/code> \u7684\u8bbe\u8ba1<\/h3>\n<p><code>PoolNode<\/code> \u7531\u4e00\u4e2a\u70ed\u70b9\u961f\u5217 <code>hot_queue<\/code> \u548c\u4e00\u4e2a\u5b58\u653e\u5269\u4f59\u6240\u6709\u975e\u70ed\u70b9\u8fde\u63a5\u7684 Hashmap <code>connections<\/code> \u7ec4\u6210\u3002<br \/>\n\u9700\u8981\u6ce8\u610f\u7684\u662f\uff0c\u8fd9\u4e2a\u6765\u81ea\u4e8e crate crossbeam_queue \u7684 <code>hot_queue<\/code> \u672c\u8eab\u662f lock-free \u7684\uff0c\u4f46\u5f53\u6d89\u53ca\u5230\u5bf9\u961f\u5217\u4e2d\u7684\u6307\u5b9a id \u8fdb\u884c\u5220\u9664\u64cd\u4f5c\u65f6\uff0c\u8fd8\u662f\u9700\u8981\u5f15\u5165\u4e00\u4e2a\u9501\uff0c\u9632\u6b62\u540c\u4e00\u4e2a id \u88ab\u5220\u4e24\u6b21\u3002<\/p>\n<pre><code class=\"language-rust\">\/\/\/ A pool of exchangeable items\npub struct PoolNode&lt;T&gt; {\n    connections: Mutex&lt;HashMap&lt;ID, T&gt;&gt;,\n    \/\/ a small lock free queue to avoid lock contention\n    hot_queue: ArrayQueue&lt;(ID, T)&gt;,\n    \/\/ to avoid race between 2 evictions on the queue\n    hot_queue_remove_lock: Mutex&lt;()&gt;,\n    \/\/ TODO: store the GroupKey to avoid hash collision?\n}<\/code><\/pre>\n<p>\u5728\u5f15\u5165\u4e86 <code>hot_queue<\/code> \u4ee5\u540e\uff0c\u5c1d\u8bd5\u53d6\u8fde\u63a5\u65f6\u4f1a\u4f18\u5148\u5c1d\u8bd5 pop \u4e00\u4e2a\u70ed\u70b9\u8fde\u63a5\u51fa\u6765\uff0c\u5982\u679c\u5931\u8d25\u5219\u518d\u53bb\u4ece connections \u4e2d\u53d6\uff1a<\/p>\n<pre><code class=\"language-rust\">\/\/\/ Get any item from the pool\npub fn get_any(&amp;self) -&gt; Option&lt;(ID, T)&gt; {\n    let hot_conn = self.hot_queue.pop();\n    if hot_conn.is_some() {\n        return hot_conn;\n    }\n    let mut connections = self.connections.lock();\n    \/\/ find one connection, any connection will do\n    let id = match connections.iter().next() {\n        Some((k, _)) =&gt; *k, \/\/ OK to copy i32\n        None =&gt; return None,\n    };\n    \/\/ unwrap is safe since we just found it\n    let connection = connections.remove(&amp;id).unwrap();\n    \/* NOTE: we don&#039;t resize or drop empty connections hashmap\n        * We may want to do it if they consume too much memory\n        * maybe we should use trees to save memory *\/\n    Some((id, connection))\n    \/\/ connections.lock released here\n}<\/code><\/pre>\n<p>\u8fd9\u4e2a\u8bbe\u8ba1\u7684\u6709\u8da3\u4e4b\u5904\u5728\u4e8e\uff1a<code>hot_queue<\/code> \u672c\u8eab\u662f lock-free \u7684\uff08TODO:\uff09\uff0c\u56e0\u6b64\u53ef\u4ee5\u663e\u8457\u51cf\u5c11\u5bf9 <code>connections<\/code> \u7684\u9501\u7ade\u4e89\u3002<\/p>\n<p>\u5411\u6c60\u4e2d\u653e\u5165\u8fde\u63a5\u65f6\uff0c\u81ea\u7136\u4e5f\u662f\u5148\u5411 <code>hot_queue<\/code> \u4e2d\u5b58\uff0c\u5982\u679c\u628a\u67d0\u4e2a\u8fde\u63a5\u6324\u51fa\u4e86\u70ed\u70b9\u961f\u5217\uff0c\u5219\u628a\u6324\u51fa\u7684\u8fde\u63a5\u5b58\u653e\u5230 hashmap \u4e2d\u3002<\/p>\n<pre><code class=\"language-rust\">\/\/\/ Insert an item with the given unique ID into the pool\npub fn insert(&amp;self, id: ID, conn: T) {\n    if let Err(node) = self.hot_queue.push((id, conn)) {\n        \/\/ hot queue is full\n        let mut connections = self.connections.lock();\n        connections.insert(node.0, node.1); \/\/ TODO: check dup\n    }\n}<\/code><\/pre>\n<p>\u5f53\u4e00\u4e2a\u8fde\u63a5\u7531 lru \u6c70\u6362\uff0c\u9700\u8981\u4ece <code>PoolNode<\/code> \u4e2d\u5254\u9664\u6307\u5b9a id \u7684\u8fde\u63a5\u65f6\uff0c\u4ece\u903b\u8f91\u89d2\u5ea6\u51fa\u53d1\uff0c\u4e5f\u662f\u5148\u5bf9 hashmap \u52a0\u9501\u3001\u904d\u5386\uff0c\u800c\u540e\u5bf9 <code>hot_queue<\/code> \u52a0\u9501\u3001\u904d\u5386\u3002<\/p>\n<pre><code class=\"language-rust\">\/\/ This function acquires 2 locks and iterates over the entire hot queue.\n\/\/ But it should be fine because remove() rarely happens on a busy PoolNode.\n\/\/\/ Remove the item associated with the id from the pool. The item is returned\n\/\/\/ if it is found and removed.\npub fn remove(&amp;self, id: ID) -&gt; Option&lt;T&gt; {\n    \/\/ check the table first as least recent used ones are likely there\n    let removed = self.connections.lock().remove(&amp;id);\n    if removed.is_some() {\n        return removed;\n    } \/\/ lock drops here\n\n    let _queue_lock = self.hot_queue_remove_lock.lock();\n    \/\/ check the hot queue, note that the queue can be accessed in parallel by insert and get\n    let max_len = self.hot_queue.len();\n    for _ in 0..max_len {\n        if let Some((conn_id, conn)) = self.hot_queue.pop() {\n            if conn_id == id {\n                \/\/ this is the item, it is already popped\n                return Some(conn);\n            } else {\n                \/\/ not this item, put back to hot queue, but it could also be full\n                self.insert(conn_id, conn);\n            }\n        } else {\n            \/\/ other threads grab all the connections\n            return None;\n        }\n    }\n    None\n    \/\/ _queue_lock drops here\n}<\/code><\/pre>\n<h3>3.2. <code>PoolConnection<\/code> \u7684\u8bbe\u8ba1<\/h3>\n<p>\u6bd4\u8f83\u7b80\u5355\uff0c\u4e3b\u8981\u662f\u591a\u4e86\u4e00\u4e2a oneshot \u7684 channel\uff0c\u7528\u6765\u5728\u67d0\u4e9b\u573a\u5408\u4e0b\u53d1\u51fa\u901a\u77e5\u3002\u5728\u540e\u6587\u4e2d\u6211\u4eec\u4f1a\u770b\u5230\uff0c\u8fd9\u4e2a <code>release()<\/code> \u7684\u5b9e\u9645\u7528\u9014\u662f\uff1a\u5f53\u4e00\u4e2a\u8fde\u63a5\u88ab\u4ece\u6c60\u4e2d\u53d6\u51fa\u65f6\uff0c\u901a\u8fc7\u6b64 channel \u5173\u95ed\u8be5\u8fde\u63a5\u7684\u95f2\u7f6e\u8d85\u65f6\u8ba1\u65f6\u5668\u3002<\/p>\n<pre><code class=\"language-rust\">struct PoolConnection&lt;S&gt; {\n    pub notify_use: oneshot::Sender&lt;bool&gt;,\n    pub connection: S,\n}\n\nimpl&lt;S&gt; PoolConnection&lt;S&gt; {\n    pub fn new(notify_use: oneshot::Sender&lt;bool&gt;, connection: S) -&gt; Self {\n        PoolConnection {\n            notify_use,\n            connection,\n        }\n    }\n\n    pub fn release(self) -&gt; S {\n        \/\/ notify the idle watcher to release the connection\n        let _ = self.notify_use.send(true);\n        \/\/ wait for the watcher to release\n        self.connection\n    }\n}\n<\/code><\/pre>\n<h2>4. \u9876\u5c42\u63a5\u53e3\uff1a<code>ConnectionPool<\/code> \u7684\u8bbe\u8ba1<\/h2>\n<h3>4.1. \u5b58\u5165\u8fde\u63a5<\/h3>\n<p>\u903b\u8f91\u6ca1\u6709\u592a\u591a\u51fa\u4e4e\u610f\u6599\u7684\uff1a<\/p>\n<ol>\n<li>\u5148\u628a\u8fde\u63a5\u5728 lru \u4e2d\u8bb0\u5f55\u4e0b\u6765<\/li>\n<li>\u5982\u679c lru \u53d1\u751f\u4e86\u6c70\u6362\uff0c\u5219\u4ece\u6c60\u4e2d\u79fb\u9664\u5bf9\u5e94\u7684\u8fde\u63a5<\/li>\n<li>\u6839\u636e <code>GroupKey<\/code> \uff0c\u83b7\u53d6\u5230\u5bf9\u5e94\u7684 <code>PoolNode<\/code>\uff0c\u5728\u5176\u4e2d\u5b58\u5165\u8fde\u63a5<\/li>\n<li>\u8fd4\u56de\u4e24\u4e2a\u503c\uff0c\u5176\u4e00\u662f\u8fde\u63a5\u88ab\u6c70\u6362\u7684 Notify\uff0c\u5176\u4e8c\u662f\u8fde\u63a5\u88ab\u53d6\u51fa\u65f6\uff0c\u7528\u4e8e\u901a\u77e5\u95f2\u7f6e\u8ba1\u65f6\u5668\u7684 channel\n<pre><code class=\"language-rust\">\/\/\/ Release a connection to this pool for reuse\n\/\/\/\n\/\/\/ - The returned [`Arc&lt;Notify&gt;`] will notify any listen when the connection is evicted from the pool.\n\/\/\/ - The returned [`oneshot::Receiver&lt;bool&gt;`] will notify when the connection is being picked up by [Self::get()].\npub fn put(\n&amp;self,\nmeta: &amp;ConnectionMeta,\nconnection: S,\n) -&gt; (Arc&lt;Notify&gt;, oneshot::Receiver&lt;bool&gt;) {\nlet (notify_close, replaced) = self.lru.add(meta.id, meta.clone());\nif let Some(meta) = replaced {\n    self.pop_evicted(&amp;meta);\n};\nlet pool_node = self.get_pool_node(meta.key);\nlet (notify_use, watch_use) = oneshot::channel();\nlet connection = PoolConnection::new(notify_use, connection);\npool_node.insert(meta.id, connection);\n(notify_close, watch_use)\n}<\/code><\/pre>\n<\/li>\n<\/ol>\n<p>\u7a0d\u5fae\u7559\u610f\u4e00\u4e0b\u83b7\u53d6\u5bf9\u5e94 <code>PoolNode<\/code> \u7684\u903b\u8f91\uff0c\u8fd9\u91cc\u5982\u679c\u65e0\u9700\u65b0\u589e node\uff0c\u5219\u662f\u7528\u7684\u8bfb\u9501\uff0c\u53ef\u4ee5\u63d0\u9ad8\u6027\u80fd\u3002<\/p>\n<pre><code class=\"language-rust\">\/* get or create and insert a pool node for the hash key *\/\nfn get_pool_node(&amp;self, key: GroupKey) -&gt; Arc&lt;PoolNode&lt;PoolConnection&lt;S&gt;&gt;&gt; {\n    {\n        let pool = self.pool.read();\n        if let Some(v) = pool.get(&amp;key) {\n            return (*v).clone();\n        }\n    } \/\/ read lock released here\n\n    {\n        \/\/ write lock section\n        let mut pool = self.pool.write();\n        \/\/ check again since another task might have already added it\n        if let Some(v) = pool.get(&amp;key) {\n            return (*v).clone();\n        }\n        let node = Arc::new(PoolNode::new());\n        let node_ret = node.clone();\n        pool.insert(key, node); \/\/ TODO: check dup\n        node_ret\n    }\n}<\/code><\/pre>\n<h3>4.2. \u53d6\u51fa\u8fde\u63a5<\/h3>\n<p>\u4e5f\u6ca1\u6709\u592a\u591a\u610f\u5916\uff0c\u901a\u8fc7 groupkey \u627e\u5230\u5bf9\u5e94\u7684 node\uff0c\u7136\u540e\u8c03\u7528\u524d\u9762\u6211\u4eec\u770b\u5230\u8fc7\u7684 <code>get_any()<\/code>\u3002<br \/>\n\u8fd9\u91cc\u53ef\u4ee5\u53d1\u73b0\uff0cpingora \u8fde\u63a5\u6c60\u7684 lru \u662f\u53ea\u7528\u6765\u6c70\u6362\u95f2\u7f6e\u8fde\u63a5\u7684\u3002<\/p>\n<pre><code class=\"language-rust\">\/\/\/ Get a connection from this pool under the same group key\npub fn get(&amp;self, key: &amp;GroupKey) -&gt; Option&lt;S&gt; {\n    let pool_node = {\n        let pool = self.pool.read();\n        match pool.get(key) {\n            Some(v) =&gt; (*v).clone(),\n            None =&gt; return None,\n        }\n    }; \/\/ read lock released here\n\n    if let Some((id, connection)) = pool_node.get_any() {\n        self.lru.pop(&amp;id); \/\/ the notified is not needed\n        Some(connection.release())\n    } else {\n        None\n    }\n}<\/code><\/pre>\n<h3>4.3. <code>idle_poll<\/code><\/h3>\n<p>\u5728\u6574\u4e2a pingora \u8fde\u63a5\u6c60\u7684\u9876\u5c42\u63a5\u53e3\u4e2d\uff0c\u8fd9\u662f\u552f\u4e8c\u7684 async \u65b9\u6cd5\u3002\u8fd9\u4e2a\u5f02\u6b65\u65b9\u6cd5\u4f1a\u5bf9\u4e09\u4e2a Future \u8fdb\u884c <code>tokio::select<\/code>\uff0c\u5b83\u4eec\u5206\u522b\u662f\uff1a<\/p>\n<ol>\n<li><code>watch_use<\/code>\uff0c\u4e00\u4e2a channel\u7684\u63a5\u53d7\u7aef\u3002\u524d\u6587\u63d0\u5230\u5728\u5411\u8fde\u63a5\u6c60\u5b58\u653e\u65f6\u4f1a\u83b7\u53d6\u4e00\u4e2a channel \u7684 receiver\uff0c\u800c\u4ece\u8fde\u63a5\u6c60\u4e2d\u53d6\u8fde\u63a5\u65f6\uff0c\u4f1a\u8c03\u7528\u5176 <code>release()<\/code> \u65b9\u6cd5\uff0c\u901a\u8fc7 channel \u7684 sender \u53d1\u53bb\u6d88\u606f\u3002\u4e5f\u5c31\u662f\u8bf4\uff0c\u5982\u679c\u8fd9\u4e2a Future ready \u4e86\uff0c\u8bf4\u660e\u8fd9\u6761\u8fde\u63a5\u88ab\u4ece\u6c60\u4e2d\u83b7\u53d6\u3001\u7531\u8bf7\u6c42\u65b9\u4f7f\u7528\u4e86\uff0c\u56e0\u6b64\u4e0d\u518d\u9700\u8981\u6267\u884c\u95f2\u7f6e\u68c0\u67e5\uff0c\u76f4\u63a5 return \u3002<\/li>\n<li><code>notify_evicted<\/code>\uff0c\u8fd9\u4e2a Future ready \u4ee3\u8868\u672c\u8fde\u63a5\u4ece LRU \u4e2d\u6c70\u6362\u4e86\uff0c\u56e0\u6b64\u4e5f\u4e0d\u518d\u9700\u8981\u8fdb\u884c\u95f2\u7f6e\u68c0\u67e5\uff0c\u76f4\u63a5 return\u3002<\/li>\n<li><code>read_result<\/code>\uff0c\u4f1a\u5bf9 connection \u8c03\u7528 <code>read_with_timeout<\/code> \uff0c\u8fd9\u4e2a\u51fd\u6570\u8fdb\u884c\u4e00\u6b21\u5e26\u8d85\u65f6\u7684\u8bfb\u64cd\u4f5c\u3002\u8bbe\u7f6e\u8d85\u65f6\u7684\u539f\u56e0\u5f88\u660e\u663e\uff1a\u4fdd\u6d3b\u7684\u8fde\u63a5\u603b\u662f\u9700\u8981\u5728\u4e00\u5b9a\u65f6\u95f4\u540e\u65ad\u5f00\uff0c\u65e0\u8bba\u662f\u7531\u4e8e HTTP keepalive \u6216 TCP keepalive\u3002\u5982\u679c\u8bfb\u5230\u4e86\u4e1c\u897f\uff0c\u8981\u4e48\u662f EOF\uff0c\u8bf4\u660e\u5bf9\u7aef\u5173\u95ed\u4e86\u8fde\u63a5\uff1b\u8981\u4e48\u4e0d\u7b26\u5408\u9884\u671f\uff0c\u56e0\u4e3a\u76ee\u524d\u6b64\u94fe\u63a5\u662f\u95f2\u7f6e\u7684\uff0c\u4e0d\u5e94\u8be5\u518d\u6536\u5230\u5bf9\u7aef\u4e3b\u52a8\u53d1\u6765\u7684\u6d88\u606f\uff0c\u4e5f\u5e94\u8be5\u5173\u95ed\u8fde\u63a5\u3002\u56e0\u6b64\uff0c\u53ea\u8981\u8bfb\u64cd\u4f5c\u7684 Future \u8fd4\u56de\u4e86 ready\uff0c\u5c31\u8be5\u5173\u95ed\u4e4b\u3002\u5f53\u7136\uff0c\u5982\u679c\u8bfb\u64cd\u4f5c\u8d85\u65f6\u4e86\uff0c\u540c\u7406\u4e5f\u5e94\u8be5\u5173\u95ed\u3002<\/li>\n<\/ol>\n<p>\u7531\u8fd9\u4e2a async \u65b9\u6cd5\u7684\u884c\u4e3a\uff0c\u6211\u4eec\u5f88\u5bb9\u6613\u5224\u65ad\uff0c\u8fd9\u662f\u4e00\u4e2a\u8981\u88ab spawn \u51fa\u53bb\u7684\u65b9\u6cd5\u3002<\/p>\n<p>\u770b\u770b\u5728 pingora-core\/src\/connectors\/mod.rs \u4e2d\u5bf9\u8fde\u63a5\u6c60\u4e0e <code>idle_poll<\/code> \u7684\u4f7f\u7528\uff1a<\/p>\n<pre><code class=\"language-rust\">\/\/\/ Return the [Stream] to the [TransportConnector] for connection reuse.\n\/\/\/\n\/\/\/ Not all TCP\/TLS connections can be reused. It is the caller&#039;s responsibility to make sure\n\/\/\/ that protocol over the [Stream] supports connection reuse and the [Stream] itself is ready\n\/\/\/ to be reused.\n\/\/\/\n\/\/\/ If a [Stream] is dropped instead of being returned via this function. it will be closed.\npub fn release_stream(\n    &amp;self,\n    mut stream: Stream,\n    key: u64, \/\/ usually peer.reuse_hash()\n    idle_timeout: Option&lt;std::time::Duration&gt;,\n) {\n    if !test_reusable_stream(&amp;mut stream) {\n        return;\n    }\n    let id = stream.id();\n    let meta = ConnectionMeta::new(key, id);\n    debug!(&quot;Try to keepalive client session&quot;);\n    let stream = Arc::new(Mutex::new(stream));\n    let locked_stream = stream.clone().try_lock_owned().unwrap(); \/\/ safe as we just created it\n    let (notify_close, watch_use) = self.connection_pool.put(&amp;meta, stream);\n    let pool = self.connection_pool.clone(); \/\/clone the arc\n    let rt = pingora_runtime::current_handle();\n    rt.spawn(async move {\n        pool.idle_poll(locked_stream, &amp;meta, idle_timeout, notify_close, watch_use)\n            .await;\n    });\n}<\/code><\/pre>\n<h3>4.4. <code>idle_timeout<\/code><\/h3>\n<p>\u8fd9\u662f\u8fde\u63a5\u6c60\u9876\u5c42\u63a5\u53e3\u4e2d\u53e6\u4e00\u4e2a async \u65b9\u6cd5\uff0c\u540c\u6837\u4e5f\u662f\u7528\u4e8e\u8fde\u63a5\u6c60\u4e2d\u8fde\u63a5\u7684\u95f2\u7f6e\u7ba1\u7406\u3002\u4e0e\u521a\u521a\u7684 <code>idle_poll<\/code> \u65b9\u6cd5\u7684\u533a\u522b\u5728\u4e8e\uff0c\u8fd9\u4e2a\u65b9\u6cd5\u4e0d\u4f1a\u53bb\u4e3b\u52a8\u68c0\u67e5\u8fde\u63a5\u662f\u5426\u7834\u88c2\uff0c\u4e5f\u5c31\u662f\u8bf4\uff0c\u4e0d\u4f1a\u5c1d\u8bd5\u5728\u8fde\u63a5\u4e0a\u8fdb\u884c\u8bfb\u64cd\u4f5c\u3002\u76f8\u53cd\uff0c\u8fd9\u4e2a\u65b9\u6cd5\u63a5\u53d7\u4e00\u4e2a\u989d\u5916\u7684 channel receiver \u4f5c\u4e3a\u53c2\u6570\uff0c\u5e76\u7531\u8fd9\u4e2a\u7ba1\u9053\u7684\u901a\u77e5\u6765\u611f\u77e5\u8fde\u63a5\u662f\u5426\u65ad\u5f00\u3002<\/p>\n<p>\u5148\u770b\u4ee3\u7801\uff0c\u6ce8\u610f\u5176\u4e2d\u7684 <code>notify_closed<\/code>\uff1a<\/p>\n<pre><code class=\"language-rust\">\/\/\/ Passively wait to close the connection after the timeout\n\/\/\/\n\/\/\/ If this connection is not being picked up or evicted before the timeout is reach, this\n\/\/\/ function will remove it from the pool and close the connection.\npub async fn idle_timeout(\n    &amp;self,\n    meta: &amp;ConnectionMeta,\n    timeout: Duration,\n    notify_evicted: Arc&lt;Notify&gt;,\n    mut notify_closed: watch::Receiver&lt;bool&gt;,\n    watch_use: oneshot::Receiver&lt;bool&gt;,\n) {\n    tokio::select! {\n        biased;\n        _ = watch_use =&gt; {\n            debug!(&quot;idle connection is being picked up&quot;);\n        },\n        _ = notify_evicted.notified() =&gt; {\n            debug!(&quot;idle connection is being evicted&quot;);\n            \/\/ TODO: gracefully close the connection?\n        }\n        _ = notify_closed.changed() =&gt; {\n            \/\/ assume always changed from false to true\n            debug!(&quot;idle connection is being closed&quot;);\n            self.pop_closed(meta);\n        }\n        _ = sleep(timeout) =&gt; {\n            debug!(&quot;idle connection is being evicted&quot;);\n            self.pop_closed(meta);\n        }\n    };\n}<\/code><\/pre>\n<p>\u90a3\u65e2\u7136\u6211\u4eec\u5df2\u7ecf\u6709\u4e86 <code>idle_poll<\/code>, \u4e3a\u4ec0\u4e48\u8fd8\u9700\u8981\u5462\u8fd9\u6837\u4e00\u4e2a\u4e0d\u4e3b\u52a8\u8bfb eof \u7684 <code>idle_timeout<\/code> \u5462\uff1f\u8fd9\u5c31\u9700\u8981\u6211\u4eec\u770b\u4e00\u4e0b\u5230\u5e95\u662f\u4ec0\u4e48\u5730\u65b9\u5728\u4f7f\u7528\u6b64\u65b9\u6cd5\u4e86\u3002\u4ee3\u7801\u5728 pingora-core\/src\/connectors\/http\/v2.rs<\/p>\n<pre><code class=\"language-rust\">\/\/\/ Release a finished h2 stream.\n\/\/\/\n\/\/\/ This function will terminate the [Http2Session]. The corresponding h2 connection will now\n\/\/\/ have one more free stream to use.\n\/\/\/\n\/\/\/ The h2 connection will be closed after `idle_timeout` if it has no active streams.\npub fn release_http_session&lt;P: Peer + Send + Sync + &#039;static&gt;(\n    &amp;self,\n    session: Http2Session,\n    peer: &amp;P,\n    idle_timeout: Option&lt;Duration&gt;,\n) {\n    let id = session.conn.id();\n    let reuse_hash = peer.reuse_hash();\n    \/\/ get a ref to the connection, which we might need below, before dropping the h2\n    let conn = session.conn();\n\n    \/\/ The lock here is to make sure that in_use_pool.insert() below cannot be called after\n    \/\/ in_use_pool.release(), which would have put the conn entry in both pools.\n    \/\/ It also makes sure that only one conn will trigger the conn.is_idle() condition, which\n    \/\/ avoids putting the same conn into the idle_pool more than once.\n    let locked = conn.0.release_lock.lock_arc();\n    \/\/ this drop() will both drop the actual stream and call the conn.release_stream()\n    drop(session);\n    \/\/ find and remove the conn stored in in_use_pool so that it could be put in the idle pool\n    \/\/ if necessary\n    let conn = self.in_use_pool.release(reuse_hash, id).unwrap_or(conn);\n    if conn.is_closed() {\n        \/\/ Already dead h2 connection\n        return;\n    }\n    if conn.is_idle() {\n        drop(locked);\n        let meta = ConnectionMeta {\n            key: reuse_hash,\n            id,\n        };\n        let closed = conn.0.closed.clone();\n        let (notify_evicted, watch_use) = self.idle_pool.put(&amp;meta, conn);\n        if let Some(to) = idle_timeout {\n            let pool = self.idle_pool.clone(); \/\/clone the arc\n            let rt = pingora_runtime::current_handle();\n            rt.spawn(async move {\n                pool.idle_timeout(&amp;meta, to, notify_evicted, closed, watch_use)\n                    .await;\n            });\n        }\n    } else {\n        self.in_use_pool.insert(reuse_hash, conn);\n        drop(locked);\n    }\n}<\/code><\/pre>\n<p>\u8fd9\u7275\u626f\u5230 pingora http2 \u7684\u5b9e\u73b0\u4e86\uff0c\u4f1a\u662f\u53e6\u4e00\u4e2a\u957f\u7bc7\u5927\u8bba\u3002\u6211\u4eec\u540e\u9762\u5355\u5f00\u4e00\u7bc7\u6587\u7ae0\u804a\u804a\u3002<\/p>\n<h2>5. \u5176\u4ed6<\/h2>\n<h3>5.1. \u8fde\u63a5\u552f\u4e00 id \u7684\u5b9e\u73b0<\/h3>\n<p>pingora \u662f\u5982\u4f55\u7528\u7b80\u5355\u7684\u65b9\u5f0f\u6765\u4e3a\u6bcf\u4e2a\u8fde\u63a5\u4ea7\u751f\u5168\u5c40\u552f\u4e00\u7684 id \u5462\uff1f<\/p>\n<p>\u9996\u5148\uff0c\u8bbe\u8ba1\u4e86\u4e00\u4e2a <code>UniqueID<\/code> trait\uff0c\u539f\u578b\uff1a<\/p>\n<pre><code class=\"language-rust\">\/\/\/ Define how a given session\/connection identifies itself.\npub trait UniqueID {\n    \/\/\/ The ID returned should be unique among all existing connections of the same type.\n    \/\/\/ But ID can be recycled after a connection is shutdown.\n    fn id(&amp;self) -&gt; i32;\n}<\/code><\/pre>\n<p>\u7136\u540e\uff0c\u7406\u6240\u5f53\u7136\u5730\uff0c\u4f7f\u7528 fd \u7684\u7aef\u53e3\u53f7\u3002\u3002\u3002<\/p>\n<pre><code class=\"language-rust\">impl UniqueID for Stream {\n    fn id(&amp;self) -&gt; i32 {\n        self.as_raw_fd()\n    }\n}\n\nimpl UniqueID for HttpSession {\n    fn id(&amp;self) -&gt; i32 {\n        self.underlying_stream.id()\n    }\n}<\/code><\/pre>\n","protected":false},"excerpt":{"rendered":"<p>1. \u57fa\u672c\u7ed3\u6784 \u5165\u53e3\u7c7b\u578b ConnectionPool \u7531\u4e00\u4e2a Hashmap pool \u548c\u4e00\u4e2a lru Lr [&hellip;]<\/p>\n","protected":false},"author":1,"featured_media":0,"comment_status":"open","ping_status":"open","sticky":false,"template":"","format":"standard","meta":{"footnotes":""},"categories":[8],"tags":[22,24,31],"class_list":["post-364","post","type-post","status-publish","format-standard","hentry","category-rust","tag-pingora","tag-rust","tag-31"],"_links":{"self":[{"href":"https:\/\/cococat.top\/index.php\/wp-json\/wp\/v2\/posts\/364","targetHints":{"allow":["GET"]}}],"collection":[{"href":"https:\/\/cococat.top\/index.php\/wp-json\/wp\/v2\/posts"}],"about":[{"href":"https:\/\/cococat.top\/index.php\/wp-json\/wp\/v2\/types\/post"}],"author":[{"embeddable":true,"href":"https:\/\/cococat.top\/index.php\/wp-json\/wp\/v2\/users\/1"}],"replies":[{"embeddable":true,"href":"https:\/\/cococat.top\/index.php\/wp-json\/wp\/v2\/comments?post=364"}],"version-history":[{"count":0,"href":"https:\/\/cococat.top\/index.php\/wp-json\/wp\/v2\/posts\/364\/revisions"}],"wp:attachment":[{"href":"https:\/\/cococat.top\/index.php\/wp-json\/wp\/v2\/media?parent=364"}],"wp:term":[{"taxonomy":"category","embeddable":true,"href":"https:\/\/cococat.top\/index.php\/wp-json\/wp\/v2\/categories?post=364"},{"taxonomy":"post_tag","embeddable":true,"href":"https:\/\/cococat.top\/index.php\/wp-json\/wp\/v2\/tags?post=364"}],"curies":[{"name":"wp","href":"https:\/\/api.w.org\/{rel}","templated":true}]}}