1. 共性与区别
写异步 rust 时,我们经常需要一种一次性的通知手段,用于实现线程同步。tokio 提供了两种满足上述需求的同步工具:tokio::sync::oneshot::channel
和 tokio::sync::Notify
。
二者主要有以下几点差距:
1.1. 单端与双端设计
对于 oneshot::channel
,自然地分为发送端 Sender
和接收端 Receiver
,二者职责分明。
然而,对于 Notify
,任意一个被 clone 的实例(实际上,Notify
本身不实现 Clone
, 所以一般是用 Arc
包起来),既能拿来当做发送端(notifier),也能拿来当接收端(waiter)。
let a = Arc::new(Notify::new());
let b = a.clone();
tokio::spawn(async move {
println!("a is waiting for notification...");
a.notified().await;
println!("a is notified!");
println!("a is now notifying b...");
a.notify_one();
});
println!("b is now notifying a...");
b.notify_one();
println!("b is waiting for notification...");
b.notified().await;
println!("b is notified!");
1.2. 通知方与等待方的数量
对于 oneshot::channel
,无论是 Sender
还是 Receiver
都只能有一个, clone 是不被允许的。
相反,Notify
在 Arc
以后可以任意 clone,即通知方和等待方都可以有任意个数。正因如此,Notify
提供了两种接口,有只唤醒一个等待方的 notify_one
和唤醒所有等待方的 notify_waiters
。
1.3. 携带数据
最后,显而易见地,channel
是可以携带数据的,但是 Notify
只能用来进行通知,并不能携带任何数据。
2. 简化版的实现原理
2.1. Notify
Notify
的定义如下:
pub struct Notify {
state: AtomicUsize,
waiters: Mutex<WaitList>,
}
可见基本设计思想是用一个 atomic 的 state 来记录状态,然后在其基础上做 CAS 来完成所有的状态转移。
当需要 Pending 来等待通知时,调用方将自己的 waker
放入 waiters
中保存。通知方则从中取出 waker
并调用 wake()
进行唤醒。这样一来,确实只需要一个全局实例,各方均使用 Arc
访问即可。
2.2. oneshot::channel
oneshot::channel
的真实实现稍微复杂一点点,为了简明扼要地说明原理,下面的代码去掉了一些字段:
pub struct Sender<T> {
inner: Option<Arc<Inner<T>>>,
}
pub struct Receiver<T> {
inner: Option<Arc<Inner<T>>>,
}
struct Inner<T> {
state: AtomicUsize,
value: UnsafeCell<Option<T>>,
tx_task: Task,
rx_task: Task,
}
struct Task(UnsafeCell<MaybeUninit<Waker>>);
可以看到一些共性的东西,例如也需要基于一个 atomic 来做 CAS。不过由于是双端设计,因此需要使用 Arc
来共用 Inner
,并且 Inner
中需要保存 Receiver
和 Sender
各自的 waker。
2.3. 理论上的性能差距
有一个问题需要注意:Notify
支持多个等待唤醒方,其代码实现使用了链表来存放这些等待方的 waker ,因此在弹出 waker 进行唤醒时,是无法避免要加锁的。我们来看一下 notify_with_strategy
方法,这是在调用 Notify::notify_one
时实际上调用的底层函数。
fn notify_with_strategy(&self, strategy: NotifyOneStrategy) {
// Load the current state
let mut curr = self.state.load(SeqCst);
// If the state is `EMPTY`, transition to `NOTIFIED` and return.
while let EMPTY | NOTIFIED = get_state(curr) {
// The compare-exchange from `NOTIFIED` -> `NOTIFIED` is intended. A
// happens-before synchronization must happen between this atomic
// operation and a task calling `notified().await`.
let new = set_state(curr, NOTIFIED);
let res = self.state.compare_exchange(curr, new, SeqCst, SeqCst);
match res {
// No waiters, no further work to do
Ok(_) => return,
Err(actual) => {
curr = actual;
}
}
}
// There are waiters, the lock must be acquired to notify.
let mut waiters = self.waiters.lock();
// The state must be reloaded while the lock is held. The state may only
// transition out of WAITING while the lock is held.
curr = self.state.load(SeqCst);
if let Some(waker) = notify_locked(&mut waiters, &self.state, curr, strategy) {
drop(waiters);
waker.wake();
}
}
可见在弹出 waker 时,使用了 self.waiters.lock()
。
而对于 oneshot ,尽管逻辑较为复杂,但是其底层实现是完全无锁的,因此二者性能孰高孰低还不好定论。
3. 性能测试
在面向网络、数据库等场景的应用中,连接池是常见的性能优化方案。设计连接池时,一般都需要感知连接的存活情况,一旦发现连接被对端关闭,我们便需要做一些对应的处理。
在用 rust 实现的连接池中,连接本身通常会被写成 future,并被用于 tokio::spawn
。当这个 future 感知到连接断开,则需要用一些手段来通知其他方面,此时我们便需要在 Notify
和 oneshot::channel
中做出选择。
下面我们简单写个 benchmark,用 tokio::sleep
模拟一定时间后的连接断开。
3.1. 代码清单
主程序 main.rs:
use futures::{future::join_all, FutureExt};
use tokio::{
sync::oneshot::{channel, Receiver, Sender},
sync::Notify,
time::sleep,
};
use std::{
future::{poll_fn, Future},
pin::Pin,
sync::Arc,
task::{Context, Poll},
time::{Duration, Instant},
};
struct NotifyTester {
sleep: Pin<Box<tokio::time::Sleep>>,
drop_notifier: Arc<Notify>,
}
impl NotifyTester {
fn start(not: Arc<Notify>) {
tokio::pin!(sleep);
let tester = Self {
sleep: Box::pin(sleep(Duration::from_secs(5))),
drop_notifier: not,
};
tokio::spawn(tester);
}
}
impl Future for NotifyTester {
type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.get_mut();
this.sleep.poll_unpin(cx)
}
}
impl Drop for NotifyTester {
fn drop(&mut self) {
self.drop_notifier.notify_one();
}
}
pub(crate) async fn test_notify(concurrent: usize) {
let mut joins = Vec::new();
for _ in 0..concurrent {
let notify = Arc::new(Notify::new());
NotifyTester::start(notify.clone());
joins.push(tokio::spawn(async move { notify.notified().await }));
}
join_all(joins.into_iter()).await;
}
struct ChannelTester {
sleep: Pin<Box<tokio::time::Sleep>>,
_drop_notifier: Sender<()>,
}
impl ChannelTester {
fn start() -> Receiver<()> {
let (tx, rx) = channel();
let tester = Self {
sleep: Box::pin(sleep(Duration::from_secs(5))),
_drop_notifier: tx,
};
tokio::spawn(tester);
rx
}
}
impl Future for ChannelTester {
type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.get_mut();
this.sleep.poll_unpin(cx)
}
}
pub(crate) async fn test_channel(concurrent: usize) {
let mut joins = Vec::new();
for _ in 0..concurrent {
let mut rx = ChannelTester::start();
joins.push(tokio::spawn(poll_fn(move |cx| rx.poll_unpin(cx))));
}
join_all(joins.into_iter()).await;
}
const CONCURRENT_COUNT: u32 = 500_000;
#[tokio::main]
async fn main() {
// let now = Instant::now();
// test_notify(CONCURRENT_COUNT as usize).await;
// let elapsed = now.elapsed();
// println!("notify need {elapsed:?}");
let now = Instant::now();
test_channel(CONCURRENT_COUNT as usize).await;
let elapsed = now.elapsed();
println!("oneshot_channel need {elapsed:?}");
}
3.2. 测试结果
由于我们在代码中模拟连接保活时 sleep 了 5s,因此最后计算耗时时需要减去。实验结果如下:
实例个数 | Notify 耗时 | oneshot channel 耗时 |
---|---|---|
10000 | 21ms | 19ms |
50000 | 93ms | 94ms |
100000 | 179ms | 192ms |
500000 | 941ms | 944ms |
1000000 | 1816ms | 1904ms |
3.3. 结论
实验可见,二者的性能其实比较接近。因此,在二者间做出选择时,主要还是根据应用场景的需求来判断,例如是否需要在通知时携带数据、是否需要多个等待 / 唤醒方。