使用Async Stream和Sink以及codec Framed

在前面介绍异步IO的时候,相关的读写操作都非常底层,要么直接操作字节,要么直接操作更高一层的Buffer,这两种方式都没有明确实际想要读和写的内容是什么,只是最原始的没有意义的字节或字符串。这种原始的操作方式比较底层,也容易出错,更不方便后期扩展和维护。

tokio_util::codec的作用是按照提前制定好的数据格式设计出对应的数据结构,之后直接以该数据结构为读写的操作单位。换句话说,codec其实就是编码和解码的作用,就像serde的角色一样。

而Async Stream和Sink则是相比于直接读写更底层字节字符串而言要更高一层的用来读写有具体意义的数据的工具。codec将AsyncRead和AsyncWrite转换为Stream和Sink,并使得Stream和Sink可以以Frame为读写单位进行读写操作。

tokio、futures、futures_util、futures_core之间的关系

在开始介绍async Stream和Sink之前,有必要先理解清楚这几个库的关系。

在很多地方都看到有人疑惑tokio与futures的关系,大概是因为大家学的第一个Rust异步库是tokio,但却在不少示例和代码中发现引入了futures中的东西,于是产生这种疑惑。看这几个库的文档首页即可找到答案。

这是tokio_stream中的pub use:


#![allow(unused)]
fn main() {
pub use futures_core::Stream;	
}

这是futures中的pub use:


#![allow(unused)]
fn main() {
pub use futures_core::future::Future;	
pub use futures_core::future::TryFuture;	
pub use futures_util::future::FutureExt;	
pub use futures_util::future::TryFutureExt;	
pub use futures_core::stream::Stream;	
pub use futures_core::stream::TryStream;	
pub use futures_util::stream::StreamExt;	
pub use futures_util::stream::TryStreamExt;	
pub use futures_sink::Sink;	
pub use futures_util::sink::SinkExt;	
pub use futures_io::AsyncBufRead;	
pub use futures_io::AsyncRead;	
pub use futures_io::AsyncSeek;	
pub use futures_io::AsyncWrite;	
pub use futures_util::AsyncBufReadExt;	
pub use futures_util::AsyncReadExt;	
pub use futures_util::AsyncSeekExt;	
pub use futures_util::AsyncWriteExt;
}

这是futures_util中的pub use:


#![allow(unused)]
fn main() {
// 查看futures_util的源码,不难发现Future、Stream、Sink等
// 都是futures_core中对应类型的重新导出
pub use crate::future::Future;	
pub use crate::future::FutureExt;	
pub use crate::future::TryFuture;	
pub use crate::future::TryFutureExt;	
pub use crate::stream::Stream;	
pub use crate::stream::StreamExt;	
pub use crate::stream::TryStream;	
pub use crate::stream::TryStreamExt;	
pub use crate::sink::Sink;	
pub use crate::sink::SinkExt;	
pub use crate::io::AsyncBufRead;	
pub use crate::io::AsyncBufReadExt;	
pub use crate::io::AsyncRead;	
pub use crate::io::AsyncReadExt;	
pub use crate::io::AsyncSeek;	
pub use crate::io::AsyncSeekExt;	
pub use crate::io::AsyncWrite;	
pub use crate::io::AsyncWriteExt;
}

显然,Stream相关的类型和Sink相关的类型,其实都来自于futures_core。因此,在需要使用到Stream相关类型或Sink相关类型的代码文件中,引入以上任意一个库都行。

Async Stream Trait

Stream Trait用于读操作,它模拟Rust标准库的Iterator,可进行迭代式读取和迭代式操作,非常具有Rust的风味。

例如:

use tokio_stream::{self as stream, StreamExt};

#[tokio::main]
async fn main() {
    let mut stream = stream::iter(vec![0, 1, 2]);

    while let Some(value) = stream.next().await {
        println!("Got {}", value);
    }
}

上面示例中通过tokio_stream::iter()创建了一个Stream,然后通过不断调用Stream的next()方法来读取Stream中的下一个数据。需要注意的是,目前不能对Stream执行for value in stream{}的迭代操作,只能不断显式地调用next()方法来读取。比如可以使用下面两种循环读取的方式。


#![allow(unused)]
fn main() {
while let Some(value) = s.next().await {}

loop {
  match s.next().await {
    Some(value) => {}
    None => break;
  }
}
}

有很多场景下需要手动提供Stream,上面使用的tokio_stream::iter()是一个很常用的创建Stream的方法,它返回tokio_stream::Iter,该类型实现了Stream。

tokio_stream::wrappers中还提供了一些对tokio中的类型封装,并将封装后的类型实现了Stream,因此它们都可以直接作为Stream使用。例如,wrappers::ReceiverStream是对tokio::sync::mpsc::Receiver的封装,并实现了Stream。参考https://docs.rs/tokio-stream/latest/tokio_stream/wrappers/index.html

最后,还有一个比较常用的生成Stream的方式的是使用async-stream crate提供的宏,它可以返回impl Stream类型。

Async Sink Trait

Sink Trait用于写操作。Sink的意思是下沉、沉入,表示可直接通过Sink方便简单地写入有意义的数据,Sink会自动将该数据转换为更底层的字节传输出去(事物放在水面即可,它会自动下沉到底层)。

这就是使用Sink过程中所需要理解的全部啦。

StreamExt和SinkExt

Async Stream和Async Sink都只提供了非常原始的方法,更多时候我们会使用StreamExt和SinkExt中提供的扩展方法来简化读写操作。其中tokio_stream提供了StreamExt,futuresfutures_util中提供了StreamExt和SinkExt,因此需要引入相关库才能使用相关的扩展方法。

关于StreamExt提供的用法,可参考官方手册(https://docs.rs/tokio-stream/latest/tokio_stream/trait.StreamExt.html),用法和Iterator没有太大区别,因此不多赘述。

有关SinkExt提供的方法,有必要介绍主要的几个写入方法:

  • send():写入Sink并flush
  • feed():写入Sink但不flush
  • flush():将已经写入Sink的数据flush
  • send_all():将给定的Stream中的零或多个数据全部写入Sink并一次或多次flush(自动决定何时flush)

用法示例如下:下面代码中的sink是一个Sink,其传输的有意义的数据格式是字符串,msg类型是字符串。


#![allow(unused)]
fn main() {
// 方式一: feed() + flush()
sink.feed(msg).await.unwrap();
sink.flush().await.unwrap();

// 方式二: send() == feed() + flush()
sink.send(msg).await.unwrap();

// 方式三:send_all(),一次发送一条或多条,但只允许futures::TryStream作为参数,
// 所以要用到futures crates来构建Stream。例如:
// let msgs = vec![Ok("hello world".to_string()), Ok("HELLO WORLD".to_string())];
let msgs = vec!["hello world".to_string(), "HELLO WORLD".to_string()];
let mut ss = futures_util::stream::iter(msgs).map(Ok);
sink.send_all(&mut ss).await.unwrap();
}

codec和Framed

tokio_util::codec可以将实现了AsyncRead/AsyncWrite的结果转换为Stream/Sink,得到的Stream和Sink可以以帧Frame为读写单位。

其中:

  • 实现了codec::Decoder的类型可以转换AsyncRead为FramedRead,FramedRead实现了Stream,因此FramedRead可进行按帧读取的操作
  • 实现了codec::Encoder的类型可以转换AsyncWrite为FramedWrite,FramedWrite实现了Sink,因此FramedWrite可进行按帧写入的操作
  • 同时实现了Decoder和Encoder的类型可转换为Framed(当然,也可以转换为FramedRead或FramedWrite),Framed既是Stream,也是Sink,可同时进行以帧为单位的读写操作。Framed可通过split()方法分离出独立的Stream和Sink分别进行读写

codec还提供了几个常用的已经同时实现了Decoder、Encoder的类型:

  • LinesCodec:以行为单位的帧
  • AnyDelimiterCodec:以指定分隔符为单位的帧
  • BytesCodec:以字节为单位的帧

因为它们同时实现了Decoder和Encoder,因此它们可转换为FramedRead、FramedWrite或Framed。

将Decoder、Encoder转换为对应Framed的方式参考如下:


#![allow(unused)]
fn main() {
// T是实现了AsyncRead的类型,例如TcpStream、File等  
// D是实现了Decoder的类型,例如LinesCodec、BytesCodec等
let framed_reader = FramedRead::new(T, D);

// T是实现了AsyncWrite的类型,例如TcpStream、File等  
// E是实现了Encoder的类型,例如LinesCodec、BytesCodec等
let framed_writer = FramedWrite::new(T, E);

// T是实现了AsyncRead + AsyncWrite的类型,例如TcpStream、File等  
// U是实现了Encoder + Decoder的类型,例如LinesCodec、BytesCodec等
let framed = Framed::new(T, U);

// T是实现了AsyncRead + AsyncWrite的类型,例如TcpStream、File等  
// 只要实现了Decoder,例如LinesCodec、BytesCodec等,就可以通过它的framed()方法生成Framed
let framed = LinesCodec::new().framed(T);
}

例如,通过LinesCodec使TcpStream能够按行进行读写操作,下面是Server端按行读写客户端的一个示例:

use futures_util::stream::{SplitSink, SplitStream};
use futures_util::{SinkExt, StreamExt};
use tokio::net::{TcpListener, TcpStream};
use tokio::sync::mpsc;
use tokio_util::codec::{Framed, LinesCodec};

type LineFramedStream = SplitStream<Framed<TcpStream, LinesCodec>>;
type LineFramedSink = SplitSink<Framed<TcpStream, LinesCodec>, String>;

#[tokio::main]
async fn main() {
    let server = TcpListener::bind("127.0.0.1:8888").await.unwrap();
    while let Ok((client_stream, _client_addr)) = server.accept().await {
        tokio::spawn(async move {
            process_client(client_stream).await;
        });
    }
}

async fn process_client(client_stream: TcpStream) {
  	// 将TcpStream转换为Framed
    let framed = Framed::new(client_stream, LinesCodec::new());
    // 将Framed分离,可得到独立的读写端
    let (frame_writer, frame_reader) = framed.split::<String>();
    // 当Reader从客户端读取到数据后,发送到通道中,
    // 另一个异步任务读取该通道,从通道中读取到数据后,将内容按行写给客户端
    let (msg_tx, msg_rx) = mpsc::channel::<String>(100);

    // 负责读客户端的异步子任务
    let mut read_task = tokio::spawn(async move {
        read_from_client(frame_reader, msg_tx).await;
    });

    // 负责向客户端写行数据的异步子任务
    let mut write_task = tokio::spawn(async move {
        write_to_client(frame_writer, msg_rx).await;
    });

    // 无论是读任务还是写任务的终止,另一个任务都将没有继续存在的意义,因此都将另一个任务也终止
    if tokio::try_join!(&mut read_task, &mut write_task).is_err() {
        eprintln!("read_task/write_task terminated");
        read_task.abort();
        write_task.abort();
    };
}

async fn read_from_client(mut reader: LineFramedStream, msg_tx: mpsc::Sender<String>) {
    loop {
        match reader.next().await {
            None => {
                println!("client closed");
                break;
            }
            Some(Err(e)) => {
                eprintln!("read from client error: {}", e);
                break;
            }
            Some(Ok(str)) => {
                println!("read from client. content: {}", str);
                // 将内容发送给writer,让writer响应给客户端,
                // 如果无法发送给writer,继续从客户端读取内容将没有意义,因此break退出
                if msg_tx.send(str).await.is_err() {
                    eprintln!("receiver closed");
                }
            }
        }
    }
}

async fn write_to_client(mut writer: LineFramedSink, mut msg_rx: mpsc::Receiver<String>) {
    while let Some(str) = msg_rx.recv().await {
        if writer.send(str).await.is_err() {
            eprintln!("write to client failed");
            break;
        }
    }
}