rust 笔记 6:Notify vs oneshot::channel

1. 共性与区别

写异步 rust 时,我们经常需要一种一次性的通知手段,用于实现线程同步。tokio 提供了两种满足上述需求的同步工具:tokio::sync::oneshot::channeltokio::sync::Notify

二者主要有以下几点差距:

1.1. 单端与双端设计

对于 oneshot::channel ,自然地分为发送端 Sender 和接收端 Receiver,二者职责分明。

然而,对于 Notify ,任意一个被 clone 的实例(实际上,Notify 本身不实现 Clone, 所以一般是用 Arc 包起来),既能拿来当做发送端(notifier),也能拿来当接收端(waiter)。

let a = Arc::new(Notify::new());
let b = a.clone();

tokio::spawn(async move {
    println!("a is waiting for notification...");
    a.notified().await;
    println!("a is notified!");
    println!("a is now notifying b...");
    a.notify_one();
});

println!("b is now notifying a...");
b.notify_one();
println!("b is waiting for notification...");
b.notified().await;
println!("b is notified!");

1.2. 通知方与等待方的数量

对于 oneshot::channel,无论是 Sender 还是 Receiver 都只能有一个, clone 是不被允许的。

相反,NotifyArc 以后可以任意 clone,即通知方和等待方都可以有任意个数。正因如此,Notify 提供了两种接口,有只唤醒一个等待方的 notify_one 和唤醒所有等待方的 notify_waiters

1.3. 携带数据

最后,显而易见地,channel 是可以携带数据的,但是 Notify 只能用来进行通知,并不能携带任何数据。

2. 简化版的实现原理

2.1. Notify

Notify 的定义如下:

pub struct Notify {
    state: AtomicUsize,
    waiters: Mutex<WaitList>,
}

可见基本设计思想是用一个 atomic 的 state 来记录状态,然后在其基础上做 CAS 来完成所有的状态转移。

当需要 Pending 来等待通知时,调用方将自己的 waker 放入 waiters 中保存。通知方则从中取出 waker 并调用 wake() 进行唤醒。这样一来,确实只需要一个全局实例,各方均使用 Arc 访问即可。

2.2. oneshot::channel

oneshot::channel 的真实实现稍微复杂一点点,为了简明扼要地说明原理,下面的代码去掉了一些字段:

pub struct Sender<T> {
    inner: Option<Arc<Inner<T>>>,
}

pub struct Receiver<T> {
    inner: Option<Arc<Inner<T>>>,
}

struct Inner<T> {
    state: AtomicUsize,
    value: UnsafeCell<Option<T>>,
    tx_task: Task,
    rx_task: Task,
}

struct Task(UnsafeCell<MaybeUninit<Waker>>);

可以看到一些共性的东西,例如也需要基于一个 atomic 来做 CAS。不过由于是双端设计,因此需要使用 Arc 来共用 Inner,并且 Inner 中需要保存 ReceiverSender 各自的 waker。

2.3. 理论上的性能差距

有一个问题需要注意:Notify 支持多个等待唤醒方,其代码实现使用了链表来存放这些等待方的 waker ,因此在弹出 waker 进行唤醒时,是无法避免要加锁的。我们来看一下 notify_with_strategy 方法,这是在调用 Notify::notify_one 时实际上调用的底层函数。

fn notify_with_strategy(&self, strategy: NotifyOneStrategy) {
    // Load the current state
    let mut curr = self.state.load(SeqCst);

    // If the state is `EMPTY`, transition to `NOTIFIED` and return.
    while let EMPTY | NOTIFIED = get_state(curr) {
        // The compare-exchange from `NOTIFIED` -> `NOTIFIED` is intended. A
        // happens-before synchronization must happen between this atomic
        // operation and a task calling `notified().await`.
        let new = set_state(curr, NOTIFIED);
        let res = self.state.compare_exchange(curr, new, SeqCst, SeqCst);

        match res {
            // No waiters, no further work to do
            Ok(_) => return,
            Err(actual) => {
                curr = actual;
            }
        }
    }

    // There are waiters, the lock must be acquired to notify.
    let mut waiters = self.waiters.lock();

    // The state must be reloaded while the lock is held. The state may only
    // transition out of WAITING while the lock is held.
    curr = self.state.load(SeqCst);

    if let Some(waker) = notify_locked(&mut waiters, &self.state, curr, strategy) {
        drop(waiters);
        waker.wake();
    }
}

可见在弹出 waker 时,使用了 self.waiters.lock()

而对于 oneshot ,尽管逻辑较为复杂,但是其底层实现是完全无锁的,因此二者性能孰高孰低还不好定论。

3. 性能测试

在面向网络、数据库等场景的应用中,连接池是常见的性能优化方案。设计连接池时,一般都需要感知连接的存活情况,一旦发现连接被对端关闭,我们便需要做一些对应的处理。

在用 rust 实现的连接池中,连接本身通常会被写成 future,并被用于 tokio::spawn。当这个 future 感知到连接断开,则需要用一些手段来通知其他方面,此时我们便需要在 Notifyoneshot::channel 中做出选择。

下面我们简单写个 benchmark,用 tokio::sleep 模拟一定时间后的连接断开。

3.1. 代码清单

主程序 main.rs:

use futures::{future::join_all, FutureExt};
use tokio::{
    sync::oneshot::{channel, Receiver, Sender},
    sync::Notify,
    time::sleep,
};

use std::{
    future::{poll_fn, Future},
    pin::Pin,
    sync::Arc,
    task::{Context, Poll},
    time::{Duration, Instant},
};

struct NotifyTester {
    sleep: Pin<Box<tokio::time::Sleep>>,
    drop_notifier: Arc<Notify>,
}

impl NotifyTester {
    fn start(not: Arc<Notify>) {
        tokio::pin!(sleep);
        let tester = Self {
            sleep: Box::pin(sleep(Duration::from_secs(5))),
            drop_notifier: not,
        };
        tokio::spawn(tester);
    }
}

impl Future for NotifyTester {
    type Output = ();

    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        let this = self.get_mut();
        this.sleep.poll_unpin(cx)
    }
}

impl Drop for NotifyTester {
    fn drop(&mut self) {
        self.drop_notifier.notify_one();
    }
}

pub(crate) async fn test_notify(concurrent: usize) {
    let mut joins = Vec::new();
    for _ in 0..concurrent {
        let notify = Arc::new(Notify::new());

        NotifyTester::start(notify.clone());
        joins.push(tokio::spawn(async move { notify.notified().await }));
    }
    join_all(joins.into_iter()).await;
}

struct ChannelTester {
    sleep: Pin<Box<tokio::time::Sleep>>,
    _drop_notifier: Sender<()>,
}

impl ChannelTester {
    fn start() -> Receiver<()> {
        let (tx, rx) = channel();
        let tester = Self {
            sleep: Box::pin(sleep(Duration::from_secs(5))),
            _drop_notifier: tx,
        };
        tokio::spawn(tester);
        rx
    }
}

impl Future for ChannelTester {
    type Output = ();

    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        let this = self.get_mut();
        this.sleep.poll_unpin(cx)
    }
}

pub(crate) async fn test_channel(concurrent: usize) {
    let mut joins = Vec::new();
    for _ in 0..concurrent {
        let mut rx = ChannelTester::start();
        joins.push(tokio::spawn(poll_fn(move |cx| rx.poll_unpin(cx))));
    }
    join_all(joins.into_iter()).await;
}

const CONCURRENT_COUNT: u32 = 500_000;

#[tokio::main]
async fn main() {
    // let now = Instant::now();
    // test_notify(CONCURRENT_COUNT as usize).await;
    // let elapsed = now.elapsed();
    // println!("notify need {elapsed:?}");

    let now = Instant::now();
    test_channel(CONCURRENT_COUNT as usize).await;
    let elapsed = now.elapsed();
    println!("oneshot_channel need {elapsed:?}");
}

3.2. 测试结果

由于我们在代码中模拟连接保活时 sleep 了 5s,因此最后计算耗时时需要减去。实验结果如下:

实例个数 Notify 耗时 oneshot channel 耗时
10000 21ms 19ms
50000 93ms 94ms
100000 179ms 192ms
500000 941ms 944ms
1000000 1816ms 1904ms

3.3. 结论

实验可见,二者的性能其实比较接近。因此,在二者间做出选择时,主要还是根据应用场景的需求来判断,例如是否需要在通知时携带数据、是否需要多个等待 / 唤醒方。

知识共享许可协议
本作品采用知识共享署名-非商业性使用-相同方式共享 4.0 国际许可协议进行许可。
暂无评论

发送评论 编辑评论


				
|´・ω・)ノ
ヾ(≧∇≦*)ゝ
(☆ω☆)
(╯‵□′)╯︵┴─┴
 ̄﹃ ̄
(/ω\)
∠( ᐛ 」∠)_
(๑•̀ㅁ•́ฅ)
→_→
୧(๑•̀⌄•́๑)૭
٩(ˊᗜˋ*)و
(ノ°ο°)ノ
(´இ皿இ`)
⌇●﹏●⌇
(ฅ´ω`ฅ)
(╯°A°)╯︵○○○
φ( ̄∇ ̄o)
ヾ(´・ ・`。)ノ"
( ง ᵒ̌皿ᵒ̌)ง⁼³₌₃
(ó﹏ò。)
Σ(っ °Д °;)っ
( ,,´・ω・)ノ"(´っω・`。)
╮(╯▽╰)╭
o(*////▽////*)q
>﹏<
( ๑´•ω•) "(ㆆᴗㆆ)
😂
😀
😅
😊
🙂
🙃
😌
😍
😘
😜
😝
😏
😒
🙄
😳
😡
😔
😫
😱
😭
💩
👻
🙌
🖕
👍
👫
👬
👭
🌚
🌝
🙈
💊
😶
🙏
🍦
🍉
😣
Source: github.com/k4yt3x/flowerhd
颜文字
Emoji
小恐龙
花!
上一篇
下一篇