异步的本质

本文最后更新于:2023年12月1日 上午

前情提要

众所周知:多进程、多线程

众也周闻:异步(Asynchronous)、协程(Coroutine)

人尽皆知:同步(Synchronous)、阻塞

我们常处于一种:清晰的混沌态;知而又不知

这就好像你知道自己拥有意识,可什么是意识

馒头卡即答:意识与枪法并重,能够在天时地利下做出正确决断,在殊死绝境中反败为胜

问天

什么是异步呢?

也许这个问题有点难,那么,什么是同步呢

可能也不好回答

让我们举一个例子吧:发送网络请求并等待响应(或者说点餐并等待上菜)

  • 同步的代码是阻塞的:作为一个普通的单线程程序,执行流将卡在wait()这一行,直到服务器发来响应,那么用户将看到一个卡死的界面
  • 异步的代码是非阻塞的:我们提交请求(sbmit)后,那行发送请求的函数代码直接返回,并不理会响应,直接执行之后的其他逻辑

那么问题来了,异步的代码直接返回,不等待响应,那么如何获取响应呢?

如何实现异步

实现机制有很多,例如前端的XMLHttpRequest采用的是多进程

多进程

主进程通过IPC(进程间通信)利用网络进程发送和管理请求,网络进程代替主线程去监控响应的返回与否,并适时通知主进程(通过消息队列)

主进程有一个事件循环(与Qt类似),会轮询消息队列,获取响应

XMLHttpRequest请求

如此一来,主渲染进程就无须等待响应,可以执行其他任务

事件循环(消息队列)

那么事件循环是什么呢,其实这也可以认为是一种异步

Qt为例,Qt GUI程序的main函数其实非常简短

1
2
3
4
5
6
7
int main(int argc, char *argv[])
{
QApplication a(argc, argv);
Widget w;
w.show();
return a.exec();
}

a.exec()最终调用了QEventLoop::exec(),其内部有一个死循环

1
2
while (!d->exit.loadAcquire())
processEvents(flags | WaitForMoreEvents | EventLoopExec);

不断地从事件队列中取出并处理事件

Qt事件循环

当你启动一个窗口程序的时候,你就已经被困在这个循环里了

你可能会感到诧异,但是仔细想想:

你的每一次点击,每一次按键,Windows系统都会封装为一个消息,而我们又在循环中监听这些消息并处理

在每一次循环中获取事件,处理事件,刷新UI,并产生新事件(包括自定义),如此往复,不就形成了一个个动人的GUI程序了嘛

这算异步吗,大体算吧,我可以开启一个Timer,每隔一段时间就去监听网络请求是否返回,同样可以实现非阻塞

这当然算异步,但是,核心不是事件循环提供了异步能力,而是系统的异步IO或者多线程在默默工作

多线程

其实多线程当然也算一种异步,我们可以把耗时任务放到一个新线程(或线程池)上,实现非阻塞效果

参考C++:std::sync

不过多线程与事件循环的异步有什么区别

线程是CPU调度的最小单位,所以线程的执行过程可能被随时打断(保存与恢复栈),这个很好理解

但是事件循环的每一个事件函数是不能被打断的,也就是同步的,所以可以称为基于同步的异步(我瞎起的名字)

Rust异步编程

我们来分析一下Rust在单线程上是如何实现异步操作的

Rust采用的是async/await模型, 该模型性能高,还能支持底层编程,同时又像线程和协程那样无需过多的改变编程模型,但有得必有失,async 模型的问题就是内部实现机制过于复杂,理解和使用起来也没有线程和协程简单

这种模型中一般都会出现Future这个词,或者JSPromise,这本质上就是一个在未来会返回的任务

RustFuture和其他语言不太一样

  • Future 在 Rust 中是惰性的,只有在被轮询(poll)时才会运行, 因此丢弃一个 future 会阻止它未来再被运行

Future是一个任务,或者说一段函数的包装,只要用async修饰函数或代码块,就会返回一个Future对象

而poll函数会非阻塞地返回该任务的当前状态Pending or Ready

1
2
3
4
5
6
async {
// 一些同步代码
fut_one.await;
// 另一些同步代码
fut_two.await;
}

这段代码会被编译器展开,自动为其实现Future接口,并将其poll函数实现为一个状态机,每一个await就是一个状态点

如此一来,我们就可以把一段异步代码拆分为几个阶段了,每个阶段内部不可打断,只有到await才会保存状态并交还控制权

看起来fut_one.await;fut_two.await;是串行的,是同步阻塞的,但其实每一个.await调用的都是poll,都只是询问状态(立即返回),空闲下来的时间其实是去轮询其他的Future任务了(由外层Executor控制,待会儿说)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
struct AsyncFuture {
fut_one: FutOne,
fut_two: FutTwo,
state: AsyncState,
}

enum AsyncState {
Start,
AwaitingFutOne,
BetweenFuts,
AwaitingFutTwo,
Done,
}

impl Future for AsyncFuture {
type Output = ();

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
match self.state {
AsyncState::Start => {
// 执行一些同步代码
self.state = AsyncState::AwaitingFutOne;
}
AsyncState::AwaitingFutOne => match self.fut_one.poll(cx) {
Poll::Ready(()) => {
// 第一个 await 完成,执行中间的同步代码
// 执行另一些同步代码
self.state = AsyncState::AwaitingFutTwo;
}
Poll::Pending => return Poll::Pending,
},
AsyncState::AwaitingFutTwo => match self.fut_two.poll(cx) {
Poll::Ready(()) => self.state = AsyncState::Done,
Poll::Pending => return Poll::Pending,
},
AsyncState::Done => return Poll::Ready(()),
}
}
}

每一个X.await都意味着X是一个异步函数,是一个Future,那么展开后的代码中,其poll函数就会被调用,用以推进任务

所以poll函数会一层层嵌套,直到一个Leaf Future(自行实现了poll来管理其Pending和Ready的状态,或者是一段完全同步的无await代码)

疑问

你们肯定很好奇,我们不断地询问(poll)任务的状态,那么谁去执行呢?

因为实际任务肯定是重量级的,是阻塞的,是消耗时间的,我们才需要用异步方式去解决对吧

那么所有poll函数都是非阻塞的,立即返回的,只是询问状态,那么这个任务怎么可能被完成(所有人都在摸鱼)

举个例子,假如我们的任务是:读取一个大文件;也就是最内层的poll函数在死循环读文件

1
2
3
4
5
6
fn poll(){
while(line = next_line()){
str += line;
}
return Ready;
}

那么,我们能从外层期望调用这个poll函数来返回这个任务的状态吗,是Pending还是Ready

那显然是不可能的,作为一个单线程,怎么可能在一个阻塞的While循环进行时,又从外部调用这个函数

一旦进入了这个poll函数,那么就必须被阻塞到文件读取完毕,才返回Ready,那么,也就谈不上什么异步了

所以其实要理解Rust是如何实现异步的,就要去思考最终的Leaf Future的poll函数是怎么写的

比如说,我每读一行就返回怎么样

1
2
3
4
5
6
fn poll(){
line = next_line()
if line == EOF return Ready;
str += line;
return Pending;
}

这就是说,外部每调用一次poll,就会推进这个任务的进度(读一行),与惰性的Future莫名很搭有没有

那么读完一行后,就会交还控制权给外部,使其可以处理其他任务

这本质上就是时间片轮转算法或者说多任务并发,与在Qt事件循环中,每读若干行就processEvents()(以处理事件队列中的其他事件,如UI刷新)并无二异

那么你说,这种多任务并发算是异步吗?这是个值得讨论的问题

但是这种方式,读取文件的总时长并没有变化,甚至从读文件开始到结束会消耗更多的时间(期间处理其他任务)

唯一的好处是,在GUI程序中,可以保证主线程(GUI线程)不被阻塞

如果你真要提高文件读取效率,为什么不开多线程呢

当然,我们在这里讨论的是单线程异步,所以上述分片策略行不太通

大家可能已经注意到了,磁盘IO和网络IO确实存在很大的不同

区别在哪呢,我们平时不都说是异步IO吗

  • 磁盘IO是真的一直在读数据,你偷懒了一秒,那么就要多花一秒去读
  • 网络IO大部分时间都在等待,等待服务器回传数据,数据可能很少,等待要好几秒;这意味着,就算你这一秒去干别的事了,也不太耽误你对数据的读取

所以,隔一段时间去读文件,和隔一段时间去监测网络数据是完全不同的概念

所以把上述的文件IO代码换成网络IO,就完全是一个可行的异步了

1
2
3
4
5
6
7
fn poll(){
if isDataOk(){ //linux中select epoll poll
readData();
return Reday;
}
return Pending;
}

因为你真的可以去干别的事情,而不耽误等待了;相当于分片并发轮询,和事件循环也是类似的

这就体现了为什么真正的单线程异步通常要依赖于操作系统提供的异步API

那么磁盘IO就没有异步API吗

能不能让系统替我读文件,读完再告诉我

当然有,如:

  • IOCP 是 Windows 提供的一种高效的线程池技术,用于处理异步 I/O 操作
  • Linux 提供了一组 POSIX 异步 I/O (AIO) 接口,由内核完成IO操作

磁盘IO是一个实实在在的负载任务,总要有人负重前行的,要么是多线程,要么是内核(假装也算线程)

所以不依赖外物,真正的单线程异步磁盘IO是很难实现的

所以真的有单线程异步吗

如果内核算一个线程,替我们完成监测、等待工作的话,那确实没有单线程异步了

不过如果我们谈论的线程是用户空间的线程的话,那么可以依赖系统异步API实现

暴论:单线程异步高度依赖系统异步API

除非你要写一个 runtime,否则你不太可能自己实现一个叶子 Future,或者更多的是针对 Leaf-Future 的封装

这也是为什么,我们很难理解Rust是如何实现单线程异步的原因,核心的操作都被封装在tokio库内部定义的叶子Future中了(要么多线程、要么系统API)

但是如果不理解叶子Future的实现原理,等于阿巴

继续Rust异步

Okay,我们继续来讲Rust是如何实现异步的

首先,为什么要用状态机

这是为了保存状态,一个异步函数内可以包括多个异步操作

1
2
3
4
async {
fut_one.await;
fut_two.await;
}

那么,假如没有状态机,我们每次进入这个异步函数(poll),都会重复执行这两个异步操作,也不知道第一个执行完没

并且,假如两个异步操作见有一些同步代码的话,也会被重复执行,那就寄了

每一个.await都是一个关键点,代表可以在这里退出与恢复异步函数

两个.await之间假如有同步代码的话,是不可以被打断的,每个状态内部都是同步的

所以,暴论:Rust异步的本质是将大同步拆分为小同步,大任务手动拆分为小任务,而每个任务是同步的(不可被打断),虽然在同步任务里调用了异步代码 hhh(好吧 当我没说

状态保存:

其实Rust异步本质上是基于无栈协程的,这个状态机简直一毛一样

你想想多线程,每个线程由CPU调度,可能被随时打断,每个线程的状态保存在自己的栈上,所以无论何时被打断都能保存状态

有栈协程也是模拟了这个过程

而基于状态机的无栈协程和Rust的异步poll,也是为了保存状态,但是只能保存特定点(.await)的状态

仅此而已

继续:那么poll函数由谁来调用呢

其实Rust语言本身只提供了最基本的异步关键字,但是没有提供运行时

所谓运行时,就是一个事件循环,去主动调用Futurepoll函数

我们可以看看第三方库Tokio的实现:

我们一般是这样使用的

1
2
3
4
#[tokio::main]
async fn main() {
println!("Hello world");
}

其实会被展开为

1
2
3
4
5
6
7
8
9
fn main() {
tokio::runtime::Builder::new_multi_thread() //可以是单线程or多线程
.enable_all()
.build()
.unwrap()
.block_on(async {
println!("Hello world");
})
}

这样看就很清楚了,block_on就跟Qt的exec一样,main函数从一开始,就进入了一个死循环

这是一个简化的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36

impl Executor {
pub fn block_on<F, T, O>(&self, f: F) -> O
{
let _waker = waker_fn::waker_fn(|| {});
let cx = &mut Context::from_waker(&_waker);

EX.set(self, || {
let fut = f();
pin_utils::pin_mut!(fut);
loop {
// return if the outer future is ready
if let std::task::Poll::Ready(t) = fut.as_mut().poll(cx) {
break t;
}

// consume all tasks
while let Some(t) = self.local_queue.pop() {
let future = t.future.borrow_mut();
let w = waker(t.clone());
let mut context = Context::from_waker(&w);
let _ = Pin::new(future).as_mut().poll(&mut context);
}

// no task to execute now, it may ready
if let std::task::Poll::Ready(t) = fut.as_mut().poll(cx) {
break t;
}

// block for io
self.reactor.borrow_mut().wait();
}
})
}
}

对吧,其实就是一个事件循环,不断地从任务队列里取出Future,用poll轮询和推进

而在poll函数中,如果遇到了需要异步进行的任务(如网络IO)

就会主动将任务交给reactor(用以统一监听IO)并返回Pending:在每一次循环末尾会进行self.reactor.borrow_mut().wait();

这个可以是阻塞的,因为只有当任务队列为空(都在等待IO时)才会进入wait

所以异步与同步的区别就在于 在合适的时间做合适的事情

当IO准备完毕时,reactor会唤醒相关的Future任务(将其加入任务队列)

这样,他下次又会被调用poll函数,同时因为有状态机记录状态,可以迅速地进行IO

大概就酱

所以异步的本质是什么

是对时间的压榨,是尽量减少无谓的等待,是在合适的时间同步,是对任务的拆分,是小明倒茶问题

异步的实现方式有什么

总结起来就俩

  • 多线程(多进程也是多线程)
  • 系统异步API

Ref

Rust Tokio,运行时以及任务相关API - 知乎 (zhihu.com)

字节跳动 | Rust 异步运行时的设计与实现 - Rust精选 (rustmagazine.github.io)

ihciah/mini-rust-runtime (github.com)

future explained(1) - Hexo (rustforever.top)

Rust学习笔记-异步编程(async/await/Future) - 知乎 (zhihu.com)

async 编程入门 - Rust语言圣经(Rust Course)

无栈协程(stackless coroutine) - 知乎 (zhihu.com)

浅谈有栈协程与无栈协程 - 知乎 (zhihu.com)

一文读懂Rust的async_rust async-CSDN博客


异步的本质
https://mrbeancpp.github.io/2023/11/29/异步的本质/
作者
MrBeanC
发布于
2023年11月29日
许可协议