共计 10914 个字符,预计需要花费 28 分钟才能阅读完成。
Tokio 学习笔记(上)
1. 概述
Tokio 是一个基于 Rust 的异步运行时,专为构建高性能网络应用而设计。它提供了异步 I/O、任务调度器和工具链支持,可以与 Rust 的异步特性 (async/await
) 无缝集成。
当我们以异步方式编写应用程序时,通过减少并发任务所需的资源,开源使其更好地扩展。然而异步的 Rust 代码无法自行运行,因此需要一个运行时(Runtime)来执行它。Tokio 是 Rust 中最广泛使用的运行时库。提供了许多有用的实用工具,在编写异步代码时,不能使用 Rust 标准库中提供的 API,而是使用 Tokio 提供的 API。
优势
- Fast,快速:Tokio 基于 Rust 实现,Rust 本身就是一种系统级语言,非常快速。同时基于 async/await 语言特性上,在增加并发操作的数量上是非常高效的,从而让我们支持大量的并发任务。
- Reliable,可靠:Tokio 基于 Rust 实现,Rust 自身就是安全型语言,因此十分可靠。
- Easy,简单:Tokio 基于 async/await 特性,大大降低了编写异步应用程序的复杂性,再结合 Tokio 的工具及其生态,编写应用程序变得轻而易举。同时,Tokio 遵循标准库的命名约定,可以轻松地将仅仅使用标准库编写的代码转换为使用 Tokio 编写的代码。
- Flexible,灵活:Tokio 提供了多种运行时的变体。
multi-threaded
、work-stealing
、轻量级
、single-threaded
。每个运行时都提供了许多的 knobs,允许用户根据自己的需求进行调整。
不适用场景
- 多线程并行运行,加速 CPU 计算:Tokio 的设计主要用于 IO 绑定应用程序,其中每个单独的任务大部分时间都在等待 IO。若想实现并行运行计算,应该使用 rayon。
- 读取大量文件:操作系统通常不提供异步文件的 API,因此,在处理大量文件上也没有优势。
- 发送单个网络请求:当你不需要一次做很多事时,采用异步反而会增大系统的复杂性,此时应该使用阻塞版本。
参考资料
- 官网文档:https://tokio.rs
- Github: https://github.com/tokio-rs/tokio
2. 快速入门:Mini-Redis
Mini-Redis 是 Tokio 官方提供的一个方便学习 Tokio 的项目,因为是一个学习项目,因此不适用于实际生产环境。
安装 Mini-Redis
cargo install mini-redis
启动 Mini-Redis, --port
指定端口
mini-redis-server --port 6380
创建项目
mkdir tokio-tutorial && cd tokio-tutorial
cargo new my-redis
添加依赖
cargo add mini-redis
cargo add tokio --features=full
或者编写Cargo.toml
文件
[dependencies]
mini-redis = "0.4.1"
tokio = { version = "1.43.0", features = ["full"] }
Hello Tokio
编写代码:
use mini_redis::{client, Result};
#[tokio::main]
async fn main() -> Result<()> {
// 连接mini-redis
let mut client = client::connect("127.0.0.1:6380").await?;
// 设置值
client.set("hello", "world".into()).await?;
// 获取值
let result = client.get("hello").await?;
println!("result: {:?}", result);
Ok(())
}
使用cargo run
执行:
代码解析
let mut client = client::connect("127.0.0.1:6380").await?;
client::connect
是 mini-redis 提供的连接 API。它异步地与指定的远程地址建立了一个 TCP 连接。一旦连接建立成果,则会返回一个Client
。代码整体看起来其实是同步的,但是.await
操作符表示该操作是异步的。
大多数计算机程序的执行顺序与它的编写顺序相同。第一行执行,然后是下一行,以此类推。在同步编程中,若执行第一行时无法立即完成操作,此时就会阻塞,造成后续的操作无法继续执行。
在异步编程中,不能立即完成的操作会被暂停到后台,线程没有被阻塞,可以继续执行其它的事情。一旦操作完成,任务就会被取消暂停,继续从它离开的地方处理。
尽管异步编程可以带来更快的应用,但它往往增大了系统的复杂性,开发人员需要跟踪状态,而这一工作通常是繁琐且极易出错的。
编译时安全线程
Rust 使用一个叫做async/await
的功能实现异步编程。执行异步操作的函数都有一个async
关键字。例如在上述示例中client::connect
函数的定义如下:
pub async fn connect<T: ToSocketAddrs>(addr: T) -> crate::Result<Client> {
// The `addr` argument is passed directly to `TcpStream::connect`. This
// performs any asynchronous DNS lookup and attempts to establish the TCP
// connection. An error at either step returns an error, which is then
// bubbled up to the caller of `mini_redis` connect.
let socket = TcpStream::connect(addr).await?;
// Initialize the connection state. This allocates read/write buffers to
// perform redis protocol frame parsing.
let connection = Connection::new(socket);
Ok(Client { connection })
}
Rust 在编译时会将async fn
定义的函数转化为一个异步运行的 routine。在async fn
中对.await
的任何调用都会将控制权交还给线程。当操作在后台进行时,线程就可以做其他工作。
使用 async/await
异步函数的调用与其他 Rust 函数一样,但是在调用这些异步函数时,不会导致函数的主体部分立即执行,而是返回一个代表操作的值或 Future。要实际运行该操作时,应该在返回值上使用.await
操作符。
例如:
async fn say_world() {
println!("world");
}
#[tokio::main]
async fn main() {
let task = say_world();
println!("hello");
task.await;
}
程序的输出为:
hello
world
只有在使用.await
操作符才会实际执行say_world
函数体内容。
异步 main 函数
在上面的代码中,我们的 main 函数定义如下:
#[tokio::main]
async fn main() {
// function body
}
与传统main函数不一样:
- 使用
async
声明为异步的 - 使用注解
#[tokio::main]
使用async fn
是因为我们想进入一个异步上下文。然而,异步函数必须由一个运行时来执行,运行时包含异步任务调度、事件化 IO、计时器等,运行时不会自动启动,所以需要 main 函数启动它。
#[tokio::main]
是一个宏,它将async fn main
转化为同步的fn main
,并初始化一个运行时实例执行异步 main 函数。
例如:
#[tokio::main]
async fn main() {
println!("hello");
}
等价于
fn main() {
tokio::runtime::Runtime::new().unwrap().block_on(async {
println!("hello");
});
}
3. 实现 Mini-Redis:接收 Socket
Redis 服务器需要做的第一件事就是接收传入的 TCP Socket。要实现这一功能,可以使用tokio::net::TcpListener
绑定到监听端口6380
。然后 Socket 会在循环中被接收,每个 Socket 都会被处理然后关闭。
现在实现:读取命令,将其打印到标准输出并回复一个错误。
use mini_redis::{Connection, Frame};
use tokio::net::{TcpListener, TcpStream};
#[tokio::main]
async fn main() {
// 监听6380端口 等待socket接入
let listener = TcpListener::bind("127.0.0.1:6380").await.unwrap();
loop {
// 接受socket
let (socket, _) = listener.accept().await.unwrap();
// 处理socket
process(socket).await;
}
}
async fn process(socket: TcpStream) {
let mut connection = Connection::new(socket);
if let Some(frame) = connection.read_frame().await.unwrap() {
println!("GOT: {:?}", frame);
// 响应一个错误
let response = Frame::Error("unimplemented".to_string());
connection.write_frame(&response).await.unwrap();
}
}
运行服务
cargo run
将之前写的 Hello Tokio 程序移至src/bin
下,然后执行
cargo run --bin hello
后续服务端启动通过
cargo run --bin my-redis
启动
并发性
上面实现的 Redis 服务器还有一个小问题。服务端每次只能处理一个传入请求,当连接被接收时,服务器会一直停留在接收循环块中,直到响应完成。为了能够处理更多并发请求,需要做一些优化。
为了并发处理 Socket 连接,我们为每一个 Socket 生成一个 Task,每个 Task 都用于处理连接。
async fn main() {
// 监听6380端口 等待socket接入
let listener = TcpListener::bind("127.0.0.1:6380").await.unwrap();
loop {
// 接受socket
let (socket, _) = listener.accept().await.unwrap();
// 使用spawn启动一个新任务
tokio::spawn(async move {
process(socket).await;
});
}
}
一个 Tokio Task 是一个异步的安全线程,它们可以通过async
块传递给tokio::spawn
来创建。tokio::spawn
函数返回的是一个JobHandle
,调用者可以使用它与生成的任务进行交互。async
可能有一个返回值,调用者可以在JobHandle
上使用.await
获取返回值。
例如:
#[tokio::main]
async fn main() {
let handle = tokio::spawn(async {
"value"
});
let res = handle.await.unwrap();
println!("res: {:?}", res);
}
Task 是由调度程序的执行单元。生成任务会将其提交给 Tokio 调度程序,然后确保 Task 在有工作要做时执行。生成的 Task 可能会生成它的同一线程上执行,也可能会在不同的运行时线程上执行。Task 在生成后会在不同的线程之间移动。
Tokio 中的 Task 时非常轻量级的,在底层,它们仅需要分配一个 64 字节的内存。
'static
绑定
使用 Tokio 生成的 Task,其生命周期必须是'static
,这也就意味着生成的 Task 不能包含对外部拥有的数据的引用。
例如下面的代码就会报错:
#[tokio::main]
async fn main() {
let v = "value";
let handle = tokio::spawn(async {
v
});
let res = handle.await.unwrap();
println!("res: {:?}", res);
}
这是因为,默认情况下,变量的所有权不会移动到异步的代码块中,v
变量仍然由main
函数拥有。在println!
中借用了v
。
为解决这个错误,我们可以使用async move
将v
移动到异步代码块,让它'static
。如果一个数据需要被多个 Task 访问,那么必须使用同步原语(如**Arc**
)来共享它。
实现 my-redis 的数据存储
接下来,将实现process
函数来处理传入的命令。我们使用HashMap
来存储数值,SET
命令将数据插入到HashMap
中,GET
命令根据 key 从HashMap
中取出来。
async fn process(socket: TcpStream) {
use mini_redis::Command::{self, Get, Set};
use std::collections::HashMap;
// 使用HashMap 作为数据库
let mut db = HashMap::new();
let mut connection = Connection::new(socket);
// 循环读取frame
while let Some(frame) = connection.read_frame().await.unwrap() {
// 解析frame
let response = match Command::from_frame(frame).unwrap() {
// 设置值
Set(cmd) => {
db.insert(cmd.key().to_string(), cmd.value().to_vec());
Frame::Simple("OK".to_string())
}
// 获取值
Get(cmd) => {
if let Some(value) = db.get(cmd.key()) {
Frame::Bulk(value.clone().into())
} else {
Frame::Null
}
}
cmd => panic!("unimplemented {:?}", cmd),
};
connection.write_frame(&response).await.unwrap();
}
}
现在我们就可以设置值并获取到的值了。但是,目前的值在各个 Socket 中并不是共享的,如果另外一个连接进来尝试获取hello
,就无法获取到值了。
4. 实现 Mini-Redis: 状态共享及锁
在上篇中,我们实现了一个支持简单 GET/SET 命令操作的 Key-Value DB。但是,该 DB 存在一个致命的缺陷:各个 Socket 的数据不是共享的。本节就介绍下 Tokio 的共享数据。
共享策略
- 互斥锁
Mutex
保护共享状态; - 生成一个 Task 来管理状态,并使用消息传递来操作它。
对于简单的数据可以采用第一种方式,对于需要一部工作的如 IO 原语则使用第二种方法。在上篇中,我们使用 HashMap 存储数据,执行的操作是同步的insert
和get
,若想让其在各个 Socket 之间共享,则应该使用第一种方式:Mutex
。
bytes
库介绍
bytes
是 Rust 中一个轻量级的库,用于高效地处理字节序列。它广泛用于网络编程、I/O 操作和其他需要处理字节流的场景。
特性
- 零拷贝(Zero-Copy):
bytes
的核心设计目标是尽量减少数据拷贝,通过引用计数共享底层字节缓冲区。 - 分片(Slicing): 支持对字节缓冲区进行分片,操作更高效且避免额外的内存分配。
- 灵活性: 提供了多种 API 来满足不同的需求,比如读写操作、解析、序列化等。
- 高性能: 它针对性能进行了优化,是异步环境和高吞吐量应用的理想选择。
在实现My-Redis
中,存储的 value 将通过bytes
存储。
定义共享的 HashMap
type Db = Arc<Mutex<HashMap<String, Bytes>>>;
其中,Arc 和 Mutex 来自标准库而非 Tokio,即std::sync::{Arc,Mutex}
。Tokio 中的tokio::sync::Mutex
是异步互斥锁,只有在调用.await
时才会被锁定,而标准库中的Mutex
是同步互斥锁,它会在等待锁时阻塞当前线程。
修改process
函数
在定义了共享的 HashMap 后,就需要修改process
函数了。此时process
就不需要初始化一个 HashMap,而是传入一个 HashMap 的句柄,在process
中通过该句柄对其操作。
async fn process(socket: TcpStream, db: Db) {
use mini_redis::Command::{self, Get, Set};
let mut connection = Connection::new(socket);
// 循环读取frame
while let Some(frame) = connection.read_frame().await.unwrap() {
// 解析frame
let response = match Command::from_frame(frame).unwrap() {
// 设置值
Set(cmd) => {
// 获取锁
let mut db = db.lock().unwrap();
db.insert(cmd.key().to_string(), cmd.value().clone());
Frame::Simple("OK".to_string())
}
// 获取值
Get(cmd) => {
// 获取锁
let db = db.lock().unwrap();
if let Some(value) = db.get(cmd.key()) {
Frame::Bulk(value.clone())
} else {
Frame::Null
}
}
cmd => panic!("unimplemented {:?}", cmd),
};
connection.write_frame(&response).await.unwrap();
}
}
完整代码
use std::{
collections::HashMap,
sync::{Arc, Mutex},
};
use bytes::Bytes;
use mini_redis::{Connection, Frame};
use tokio::net::{TcpListener, TcpStream};
type Db = Arc<Mutex<HashMap<String, Bytes>>>;
#[tokio::main]
async fn main() {
// 监听6380端口 等待socket接入
let listener = TcpListener::bind("127.0.0.1:6380").await.unwrap();
// 初始化一个可共享的db结构
let db: Db = Arc::new(Mutex::new(HashMap::<String, Bytes>::new()));
loop {
// 接受socket
let (socket, _) = listener.accept().await.unwrap();
// clone
let db: Db = db.clone();
// 使用spawn启动一个新任务
tokio::spawn(async move {
process(socket, db).await;
});
}
}
async fn process(socket: TcpStream, db: Db) {
use mini_redis::Command::{self, Get, Set};
let mut connection = Connection::new(socket);
// 循环读取frame
while let Some(frame) = connection.read_frame().await.unwrap() {
// 解析frame
let response = match Command::from_frame(frame).unwrap() {
// 设置值
Set(cmd) => {
let mut db = db.lock().unwrap();
db.insert(cmd.key().to_string(), cmd.value().clone());
Frame::Simple("OK".to_string())
}
// 获取值
Get(cmd) => {
let db = db.lock().unwrap();
if let Some(value) = db.get(cmd.key()) {
Frame::Bulk(value.clone())
} else {
Frame::Null
}
}
cmd => panic!("unimplemented {:?}", cmd),
};
connection.write_frame(&response).await.unwrap();
}
}
5. 实现 Mini-Redis: 分段锁优化
使用简单的同步互斥锁在竞争者很少的情况下是可以接受的,但是,若竞争者很多时,当锁被获取,执行任务的线程必须阻塞并等待锁,这不仅会阻塞当前任务,还会阻塞当前线程上调度的所有其他任务。
默认情况下,Tokio 运行时使用多线程调用程序。任务被安排在Runtime
管理的任意线程上。若安排执行大量任务,并且它们都需要访问互斥锁,那么就会发生争用,降低执行效率。
若同步互斥锁上的竞争会导致一些问题,可以考虑下面的方法:
- 使用一个专门的任务来管理状态并使用消息传递;
- 采用分片互斥锁;
- 重构代码避免互斥锁;
分段互斥锁的实现
在本项目中,由于每个key
是独立的,互斥分片就是一个不错的选择。因此,我们引入N
个不同的实例,而不是只有一个Mutex<HashMap<_,_>>
实例。
// 单个数据库
type Db = Arc<Mutex<HashMap<String, Bytes>>>;
// 分段锁 分段数据库
type ShardedDb = Arc<Vec<Db>>;
// 创建分段数据库
fn new_sharded_db(num_shards: usize) -> ShardedDb {
let mut db = Vec::with_capacity(num_shards);
for _ in 0..num_shards {
db.push(Arc::new(Mutex::new(HashMap::new())));
}
Arc::new(db)
}
在操作ShardedDb
时,可以根据key
进行计算获取对应的锁。简单起见,采用hash(key) & size
的方式获取。
完整代码如下:
use std::{
collections::hash_map::DefaultHasher,
collections::HashMap,
hash::{Hash, Hasher},
sync::{Arc, Mutex},
};
use bytes::Bytes;
use mini_redis::{Connection, Frame};
use tokio::net::{TcpListener, TcpStream};
// 单个数据库
type Db = Arc<Mutex<HashMap<String, Bytes>>>;
// 分段锁 分段数据库
type ShardedDb = Arc<Vec<Db>>;
// 创建分段数据库
fn new_sharded_db(num_shards: usize) -> ShardedDb {
let mut db = Vec::with_capacity(num_shards);
for _ in 0..num_shards {
db.push(Arc::new(Mutex::new(HashMap::new())));
}
Arc::new(db)
}
// 计算Hash值
fn hash<T: Hash>(val: &T) -> usize {
let mut hasher = DefaultHasher::new();
val.hash(&mut hasher);
hasher.finish() as usize
}
#[tokio::main]
async fn main() {
// 监听6380端口 等待socket接入
let listener = TcpListener::bind("127.0.0.1:6380").await.unwrap();
let db: ShardedDb = new_sharded_db(64);
loop {
// 接受socket
let (socket, _) = listener.accept().await.unwrap();
let db: ShardedDb = db.clone();
// 使用spawn启动一个新任务
tokio::spawn(async move {
process(socket, db).await;
});
}
}
async fn process(socket: TcpStream, db: ShardedDb) {
use mini_redis::Command::{self, Get, Set};
let mut connection = Connection::new(socket);
// 循环读取frame
while let Some(frame) = connection.read_frame().await.unwrap() {
// 解析frame
let response = match Command::from_frame(frame).unwrap() {
// 设置值
Set(cmd) => {
let key = cmd.key().to_string();
let mut db = db[hash(&key) % db.len()].lock().unwrap();
db.insert(key, cmd.value().clone());
Frame::Simple("OK".to_string())
}
// 获取值
Get(cmd) => {
let key = cmd.key().to_string();
let db = db[hash(&key) % db.len()].lock().unwrap();
if let Some(value) = db.get(&key) {
Frame::Bulk(value.clone())
} else {
Frame::Null
}
}
cmd => panic!("unimplemented {:?}", cmd),
};
connection.write_frame(&response).await.unwrap();
}
}
至此,一个非常简单的 Redis 服务就实现了。