Tokio学习笔记(官方文档)- 上

16次阅读
没有评论

共计 10914 个字符,预计需要花费 28 分钟才能阅读完成。

内容目录

Tokio 学习笔记(上)

1. 概述

 Tokio 是一个基于 Rust 的异步运行时,专为构建高性能网络应用而设计。它提供了异步 I/O、任务调度器和工具链支持,可以与 Rust 的异步特性 (async/await) 无缝集成。

 当我们以异步方式编写应用程序时,通过减少并发任务所需的资源,开源使其更好地扩展。然而异步的 Rust 代码无法自行运行,因此需要一个运行时(Runtime)来执行它。Tokio 是 Rust 中最广泛使用的运行时库。提供了许多有用的实用工具,在编写异步代码时,不能使用 Rust 标准库中提供的 API,而是使用 Tokio 提供的 API。

优势

  • Fast,快速:Tokio 基于 Rust 实现,Rust 本身就是一种系统级语言,非常快速。同时基于 async/await 语言特性上,在增加并发操作的数量上是非常高效的,从而让我们支持大量的并发任务。
  • Reliable,可靠:Tokio 基于 Rust 实现,Rust 自身就是安全型语言,因此十分可靠。
  • Easy,简单:Tokio 基于 async/await 特性,大大降低了编写异步应用程序的复杂性,再结合 Tokio 的工具及其生态,编写应用程序变得轻而易举。同时,Tokio 遵循标准库的命名约定,可以轻松地将仅仅使用标准库编写的代码转换为使用 Tokio 编写的代码。
  • Flexible,灵活:Tokio 提供了多种运行时的变体。multi-threadedwork-stealing轻量级single-threaded。每个运行时都提供了许多的 knobs,允许用户根据自己的需求进行调整。

不适用场景

  • 多线程并行运行,加速 CPU 计算:Tokio 的设计主要用于 IO 绑定应用程序,其中每个单独的任务大部分时间都在等待 IO。若想实现并行运行计算,应该使用 rayon。
  • 读取大量文件:操作系统通常不提供异步文件的 API,因此,在处理大量文件上也没有优势。
  • 发送单个网络请求:当你不需要一次做很多事时,采用异步反而会增大系统的复杂性,此时应该使用阻塞版本。

参考资料

2. 快速入门:Mini-Redis

Mini-Redis 是 Tokio 官方提供的一个方便学习 Tokio 的项目,因为是一个学习项目,因此不适用于实际生产环境。

安装 Mini-Redis

cargo install mini-redis

启动 Mini-Redis, --port 指定端口

mini-redis-server --port 6380

创建项目

mkdir tokio-tutorial && cd tokio-tutorial
cargo new my-redis

添加依赖

cargo add mini-redis
cargo add tokio --features=full

或者编写Cargo.toml文件

[dependencies]
mini-redis = "0.4.1"
tokio = { version = "1.43.0", features = ["full"] }

Hello Tokio

编写代码:

use mini_redis::{client, Result};

#[tokio::main]
async fn main() -> Result<()> {
    // 连接mini-redis
    let mut client = client::connect("127.0.0.1:6380").await?;
    // 设置值
    client.set("hello", "world".into()).await?;
    // 获取值
    let result = client.get("hello").await?;
    println!("result: {:?}", result);
    Ok(())
}

使用cargo run执行:

Tokio学习笔记(官方文档)- 上

代码解析

let mut client = client::connect("127.0.0.1:6380").await?;

client::connect是 mini-redis 提供的连接 API。它异步地与指定的远程地址建立了一个 TCP 连接。一旦连接建立成果,则会返回一个Client。代码整体看起来其实是同步的,但是.await操作符表示该操作是异步的。

大多数计算机程序的执行顺序与它的编写顺序相同。第一行执行,然后是下一行,以此类推。在同步编程中,若执行第一行时无法立即完成操作,此时就会阻塞,造成后续的操作无法继续执行。

在异步编程中,不能立即完成的操作会被暂停到后台,线程没有被阻塞,可以继续执行其它的事情。一旦操作完成,任务就会被取消暂停,继续从它离开的地方处理。

尽管异步编程可以带来更快的应用,但它往往增大了系统的复杂性,开发人员需要跟踪状态,而这一工作通常是繁琐且极易出错的。

编译时安全线程

Rust 使用一个叫做async/await的功能实现异步编程。执行异步操作的函数都有一个async关键字。例如在上述示例中client::connect函数的定义如下:

pub async fn connect<T: ToSocketAddrs>(addr: T) -> crate::Result<Client> {
    // The `addr` argument is passed directly to `TcpStream::connect`. This
    // performs any asynchronous DNS lookup and attempts to establish the TCP
    // connection. An error at either step returns an error, which is then
    // bubbled up to the caller of `mini_redis` connect.
    let socket = TcpStream::connect(addr).await?;

    // Initialize the connection state. This allocates read/write buffers to
    // perform redis protocol frame parsing.
    let connection = Connection::new(socket);

    Ok(Client { connection })
}

 Rust 在编译时会将async fn定义的函数转化为一个异步运行的 routine。在async fn中对.await的任何调用都会将控制权交还给线程。当操作在后台进行时,线程就可以做其他工作。

使用 async/await

异步函数的调用与其他 Rust 函数一样,但是在调用这些异步函数时,不会导致函数的主体部分立即执行,而是返回一个代表操作的值或 Future。要实际运行该操作时,应该在返回值上使用.await操作符。

例如:

async fn say_world() {
    println!("world");
}

#[tokio::main]
async fn main() {
    let task = say_world();
    println!("hello");
    task.await;
}

程序的输出为:

hello
world

只有在使用.await操作符才会实际执行say_world函数体内容。

异步 main 函数

在上面的代码中,我们的 main 函数定义如下:

#[tokio::main]
async fn main() {
    // function body
}
与传统main函数不一样:
  • 使用async声明为异步的
  • 使用注解#[tokio::main]

使用async fn是因为我们想进入一个异步上下文。然而,异步函数必须由一个运行时来执行,运行时包含异步任务调度、事件化 IO、计时器等,运行时不会自动启动,所以需要 main 函数启动它。

#[tokio::main]是一个宏,它将async fn main转化为同步的fn main,并初始化一个运行时实例执行异步 main 函数。

例如:

#[tokio::main]
async fn main() {
    println!("hello");
}

等价于

fn main() {
    tokio::runtime::Runtime::new().unwrap().block_on(async {
        println!("hello");
    });
}

3. 实现 Mini-Redis:接收 Socket

 Redis 服务器需要做的第一件事就是接收传入的 TCP Socket。要实现这一功能,可以使用tokio::net::TcpListener绑定到监听端口6380。然后 Socket 会在循环中被接收,每个 Socket 都会被处理然后关闭。

 现在实现:读取命令,将其打印到标准输出并回复一个错误。

use mini_redis::{Connection, Frame};
use tokio::net::{TcpListener, TcpStream};

#[tokio::main]
async fn main() {
    // 监听6380端口 等待socket接入
    let listener = TcpListener::bind("127.0.0.1:6380").await.unwrap();
    loop {
        // 接受socket
        let (socket, _) = listener.accept().await.unwrap();
        // 处理socket
        process(socket).await;
    }
}

async fn process(socket: TcpStream) {
    let mut connection = Connection::new(socket);
    if let Some(frame) = connection.read_frame().await.unwrap() {
        println!("GOT: {:?}", frame);

        // 响应一个错误
        let response = Frame::Error("unimplemented".to_string());

        connection.write_frame(&response).await.unwrap();
    }
}

运行服务

cargo run

将之前写的 Hello Tokio 程序移至src/bin下,然后执行

cargo run --bin hello

Tokio学习笔记(官方文档)- 上

后续服务端启动通过cargo run --bin my-redis启动

Tokio学习笔记(官方文档)- 上

并发性

 上面实现的 Redis 服务器还有一个小问题。服务端每次只能处理一个传入请求,当连接被接收时,服务器会一直停留在接收循环块中,直到响应完成。为了能够处理更多并发请求,需要做一些优化。

 为了并发处理 Socket 连接,我们为每一个 Socket 生成一个 Task,每个 Task 都用于处理连接。

async fn main() {
    // 监听6380端口 等待socket接入
    let listener = TcpListener::bind("127.0.0.1:6380").await.unwrap();
    loop {
        // 接受socket
        let (socket, _) = listener.accept().await.unwrap();
        // 使用spawn启动一个新任务
        tokio::spawn(async move {
            process(socket).await;
        });
    }
}

 一个 Tokio Task 是一个异步的安全线程,它们可以通过async块传递给tokio::spawn来创建。tokio::spawn函数返回的是一个JobHandle,调用者可以使用它与生成的任务进行交互。async可能有一个返回值,调用者可以在JobHandle上使用.await获取返回值。

例如:

#[tokio::main]
async fn main() {
    let handle = tokio::spawn(async {
        "value"
    });
    let res = handle.await.unwrap();
    println!("res: {:?}", res);
}

Task 是由调度程序的执行单元。生成任务会将其提交给 Tokio 调度程序,然后确保 Task 在有工作要做时执行。生成的 Task 可能会生成它的同一线程上执行,也可能会在不同的运行时线程上执行。Task 在生成后会在不同的线程之间移动。

Tokio 中的 Task 时非常轻量级的,在底层,它们仅需要分配一个 64 字节的内存。

'static绑定

 使用 Tokio 生成的 Task,其生命周期必须是'static,这也就意味着生成的 Task 不能包含对外部拥有的数据的引用

 例如下面的代码就会报错:

#[tokio::main]
async fn main() {
    let v = "value";
    let handle = tokio::spawn(async {
        v
    });
    let res = handle.await.unwrap();
    println!("res: {:?}", res);
}

 这是因为,默认情况下,变量的所有权不会移动到异步的代码块中,v变量仍然由main函数拥有。在println!中借用了v

 为解决这个错误,我们可以使用async movev移动到异步代码块,让它'static如果一个数据需要被多个 Task 访问,那么必须使用同步原语(如**Arc**)来共享它

实现 my-redis 的数据存储

 接下来,将实现process函数来处理传入的命令。我们使用HashMap来存储数值,SET命令将数据插入到HashMap中,GET命令根据 key 从HashMap中取出来。


async fn process(socket: TcpStream) {
    use mini_redis::Command::{self, Get, Set};
    use std::collections::HashMap;
    // 使用HashMap 作为数据库
    let mut db = HashMap::new();

    let mut connection = Connection::new(socket);

    // 循环读取frame
    while let Some(frame) = connection.read_frame().await.unwrap() {
        // 解析frame
        let response = match Command::from_frame(frame).unwrap() {
            // 设置值
            Set(cmd) => {
                db.insert(cmd.key().to_string(), cmd.value().to_vec());
                Frame::Simple("OK".to_string())
            }
            // 获取值
            Get(cmd) => {

                if let Some(value) = db.get(cmd.key()) {
                    Frame::Bulk(value.clone().into())
                } else {
                    Frame::Null
                }
            }
            cmd => panic!("unimplemented {:?}", cmd),
        };
        connection.write_frame(&response).await.unwrap();
    }
}

Tokio学习笔记(官方文档)- 上

 现在我们就可以设置值并获取到的值了。但是,目前的值在各个 Socket 中并不是共享的,如果另外一个连接进来尝试获取hello,就无法获取到值了。

4. 实现 Mini-Redis: 状态共享及锁

 在上篇中,我们实现了一个支持简单 GET/SET 命令操作的 Key-Value DB。但是,该 DB 存在一个致命的缺陷:各个 Socket 的数据不是共享的。本节就介绍下 Tokio 的共享数据。

共享策略

  • 互斥锁Mutex保护共享状态;
  • 生成一个 Task 来管理状态,并使用消息传递来操作它。

 对于简单的数据可以采用第一种方式,对于需要一部工作的如 IO 原语则使用第二种方法。在上篇中,我们使用 HashMap 存储数据,执行的操作是同步的insertget,若想让其在各个 Socket 之间共享,则应该使用第一种方式:Mutex

bytes库介绍

bytes 是 Rust 中一个轻量级的库,用于高效地处理字节序列。它广泛用于网络编程、I/O 操作和其他需要处理字节流的场景。

特性

  1. 零拷贝(Zero-Copy):bytes 的核心设计目标是尽量减少数据拷贝,通过引用计数共享底层字节缓冲区。
  2. 分片(Slicing): 支持对字节缓冲区进行分片,操作更高效且避免额外的内存分配。
  3. 灵活性: 提供了多种 API 来满足不同的需求,比如读写操作、解析、序列化等。
  4. 高性能: 它针对性能进行了优化,是异步环境和高吞吐量应用的理想选择。

 在实现My-Redis中,存储的 value 将通过bytes存储。

定义共享的 HashMap

type Db = Arc<Mutex<HashMap<String, Bytes>>>;

 其中,Arc 和 Mutex 来自标准库而非 Tokio,即std::sync::{Arc,Mutex}。Tokio 中的tokio::sync::Mutex是异步互斥锁,只有在调用.await时才会被锁定,而标准库中的Mutex是同步互斥锁,它会在等待锁时阻塞当前线程。

修改process函数

 在定义了共享的 HashMap 后,就需要修改process函数了。此时process就不需要初始化一个 HashMap,而是传入一个 HashMap 的句柄,在process中通过该句柄对其操作。

async fn process(socket: TcpStream, db: Db) {
    use mini_redis::Command::{self, Get, Set};

    let mut connection = Connection::new(socket);

    // 循环读取frame
    while let Some(frame) = connection.read_frame().await.unwrap() {
        // 解析frame
        let response = match Command::from_frame(frame).unwrap() {
            // 设置值
            Set(cmd) => {
                // 获取锁
                let mut db = db.lock().unwrap();
                db.insert(cmd.key().to_string(), cmd.value().clone());
                Frame::Simple("OK".to_string())
            }
            // 获取值
            Get(cmd) => {
                // 获取锁
                let db = db.lock().unwrap();
                if let Some(value) = db.get(cmd.key()) {
                    Frame::Bulk(value.clone())
                } else {
                    Frame::Null
                }
            }
            cmd => panic!("unimplemented {:?}", cmd),
        };
        connection.write_frame(&response).await.unwrap();
    }
}

完整代码

use std::{
    collections::HashMap,
    sync::{Arc, Mutex},
};

use bytes::Bytes;
use mini_redis::{Connection, Frame};
use tokio::net::{TcpListener, TcpStream};

type Db = Arc<Mutex<HashMap<String, Bytes>>>;

#[tokio::main]
async fn main() {
    // 监听6380端口 等待socket接入
    let listener = TcpListener::bind("127.0.0.1:6380").await.unwrap();
    // 初始化一个可共享的db结构
    let db: Db = Arc::new(Mutex::new(HashMap::<String, Bytes>::new()));

    loop {
        // 接受socket
        let (socket, _) = listener.accept().await.unwrap();
        // clone
        let db: Db = db.clone();
        // 使用spawn启动一个新任务
        tokio::spawn(async move {
            process(socket, db).await;
        });
    }
}

async fn process(socket: TcpStream, db: Db) {
    use mini_redis::Command::{self, Get, Set};

    let mut connection = Connection::new(socket);

    // 循环读取frame
    while let Some(frame) = connection.read_frame().await.unwrap() {
        // 解析frame
        let response = match Command::from_frame(frame).unwrap() {
            // 设置值
            Set(cmd) => {
                let mut db = db.lock().unwrap();
                db.insert(cmd.key().to_string(), cmd.value().clone());
                Frame::Simple("OK".to_string())
            }
            // 获取值
            Get(cmd) => {
                let db = db.lock().unwrap();
                if let Some(value) = db.get(cmd.key()) {
                    Frame::Bulk(value.clone())
                } else {
                    Frame::Null
                }
            }
            cmd => panic!("unimplemented {:?}", cmd),
        };
        connection.write_frame(&response).await.unwrap();
    }
}

5. 实现 Mini-Redis: 分段锁优化

 使用简单的同步互斥锁在竞争者很少的情况下是可以接受的,但是,若竞争者很多时,当锁被获取,执行任务的线程必须阻塞并等待锁,这不仅会阻塞当前任务,还会阻塞当前线程上调度的所有其他任务。

 默认情况下,Tokio 运行时使用多线程调用程序。任务被安排在Runtime管理的任意线程上。若安排执行大量任务,并且它们都需要访问互斥锁,那么就会发生争用,降低执行效率。

 若同步互斥锁上的竞争会导致一些问题,可以考虑下面的方法:

  • 使用一个专门的任务来管理状态并使用消息传递;
  • 采用分片互斥锁;
  • 重构代码避免互斥锁;

分段互斥锁的实现

 在本项目中,由于每个key是独立的,互斥分片就是一个不错的选择。因此,我们引入N个不同的实例,而不是只有一个Mutex<HashMap<_,_>>实例。

// 单个数据库
type Db = Arc<Mutex<HashMap<String, Bytes>>>;
// 分段锁 分段数据库
type ShardedDb = Arc<Vec<Db>>;

// 创建分段数据库
fn new_sharded_db(num_shards: usize) -> ShardedDb {
    let mut db = Vec::with_capacity(num_shards);
    for _ in 0..num_shards {
        db.push(Arc::new(Mutex::new(HashMap::new())));
    }
    Arc::new(db)
}

 在操作ShardedDb时,可以根据key进行计算获取对应的锁。简单起见,采用hash(key) & size的方式获取。

完整代码如下:

use std::{
    collections::hash_map::DefaultHasher,
    collections::HashMap,
    hash::{Hash, Hasher},
    sync::{Arc, Mutex},
};

use bytes::Bytes;
use mini_redis::{Connection, Frame};
use tokio::net::{TcpListener, TcpStream};

// 单个数据库
type Db = Arc<Mutex<HashMap<String, Bytes>>>;
// 分段锁 分段数据库
type ShardedDb = Arc<Vec<Db>>;

// 创建分段数据库
fn new_sharded_db(num_shards: usize) -> ShardedDb {
    let mut db = Vec::with_capacity(num_shards);
    for _ in 0..num_shards {
        db.push(Arc::new(Mutex::new(HashMap::new())));
    }
    Arc::new(db)
}

// 计算Hash值
fn hash<T: Hash>(val: &T) -> usize {
    let mut hasher = DefaultHasher::new();
    val.hash(&mut hasher);
    hasher.finish() as usize
}

#[tokio::main]
async fn main() {
    // 监听6380端口 等待socket接入
    let listener = TcpListener::bind("127.0.0.1:6380").await.unwrap();
    let db: ShardedDb = new_sharded_db(64);

    loop {
        // 接受socket
        let (socket, _) = listener.accept().await.unwrap();
        let db: ShardedDb = db.clone();
        // 使用spawn启动一个新任务
        tokio::spawn(async move {
            process(socket, db).await;
        });
    }
}

async fn process(socket: TcpStream, db: ShardedDb) {
    use mini_redis::Command::{self, Get, Set};

    let mut connection = Connection::new(socket);

    // 循环读取frame
    while let Some(frame) = connection.read_frame().await.unwrap() {
        // 解析frame
        let response = match Command::from_frame(frame).unwrap() {
            // 设置值
            Set(cmd) => {
                let key = cmd.key().to_string();
                let mut db = db[hash(&key) % db.len()].lock().unwrap();
                db.insert(key, cmd.value().clone());
                Frame::Simple("OK".to_string())
            }
            // 获取值
            Get(cmd) => {
                let key = cmd.key().to_string();
                let db = db[hash(&key) % db.len()].lock().unwrap();
                if let Some(value) = db.get(&key) {
                    Frame::Bulk(value.clone())
                } else {
                    Frame::Null
                }
            }
            cmd => panic!("unimplemented {:?}", cmd),
        };
        connection.write_frame(&response).await.unwrap();
    }
}

至此,一个非常简单的 Redis 服务就实现了。

正文完
 
PG Thinker
版权声明:本站原创文章,由 PG Thinker 2025-01-19发表,共计10914字。
转载说明:除特殊说明外本站文章皆由CC-4.0协议发布,转载请注明出处。
评论(没有评论)
热评文章
Rust 编译并使用 Protobuf

Rust 编译并使用 Protobuf

内容目录 Rust 编译并使用 Protobuf 必要的依赖库 prost: https://github.c...
Tokio学习笔记(官方文档)- 上

Tokio学习笔记(官方文档)- 上

Tokio学习笔记,以官方文档为例,手撸一个简单的Mini-Redis。上篇设计:Tokio介绍、快速入门、spawn、shared state。