使用tokio::net进行网络编程

tokio提供了类似std::net所提供的基本设施以便进行异步网络编程,主要包括tcp、udp和unix domain三方面。

网络编程需要大量的网络编程知识,且和IO编程息息相关,因暂时还未介绍tokio::io,所以本文暂且仅介绍tokio::net的tcp编程相关的基础设施,不涉及具体的网络编程逻辑。(所以本文会比较枯燥,基本上是对官方文档的总结和引用)

要使用tokio::net,需在Cargo.toml文件中开启net特性:

tokio = {version = "1.13", features = ["rt", "net", "rt-multi-thread"]}

开启该特性之后,将可使用以下三个组件:

  • TcpSocket: 创建和操作套接字的基础组件
  • TcpListener: 对TcpSocket的一些封装,主要提供服务端套接字的相关操作
  • TcpStream: 代表已建立的可直接传递数据的连接,对客户端来说代表已经被服务端接收,对服务端来说代表accept后的套接字

通常客户端可直接使用TcpStream,服务端可直接使用TcpListener和TcpStream,如果需要自定义修改套接字的选项或属性,则考虑使用TcpSocket。

IpAddr和SocketAddr

在开始介绍tokio::net之前,需先简单介绍一下与之相关的std::net::IpAddrstd::net::SocketAddr(注意它们来自标准库)。

IpAddr

IpAddr封装了IP地址,包括IP v4地址和IP v6地址:


#![allow(unused)]
fn main() {
pub enum IpAddr {
    V4(Ipv4Addr),
    V6(Ipv6Addr),
}
}

IpAddr实现了FromStr,可直接将代表IP地址的字符串解析为IpAddr:


#![allow(unused)]
fn main() {
let localhsot: IpAddr = "127.0.0.1".parse().unwrap();
}

例如:


#![allow(unused)]
fn main() {
use std::net::{IpAddr, Ipv4Addr, Ipv6Addr};

let localhost = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1));
assert_eq!("127.0.0.1".parse(), Ok(localhost));
}

IpAddr还有一些方法,主要是一些布尔判断方法:

  • is_ipv4():是否是一个ipv4地址
  • is_ipv6():是否是一个ipv6地址
  • is_loopack():是否是一个loopback地址
  • is_multicast():是否是一个多播地址
  • is_unspecified():是否是一个0.0.0.0地址

IpAddr封装了ip v4地址或ip v6地址,以代表ip v4地址的Ipv4Addr为例。可使用new()并提供4个u8参数来创建ip v4地址:


#![allow(unused)]
fn main() {
use std::net::Ipv4Addr;

let localhost = Ipv4Addr::new(127, 0, 0, 1);
}

Ipv4Addr实现了FromStr,也可以很方便地直接将字符串解析为ip地址:


#![allow(unused)]
fn main() {
let localhost = "127.0.0.1".parse().unwrap();
}

可使用octets()将一个IP地址转换为u8数组,即new()的反向操作:


#![allow(unused)]
fn main() {
use std::net::Ipv4Addr;

let addr = Ipv4Addr::new(127, 0, 0, 1);
assert_eq!(addr.octets(), [127, 0, 0, 1]);
}

Ipv4Addr还有其它一些方法,多数都是布尔判断方法:

  • is_broadcast(): 是否是广播地址(255.255.255.255)
  • is_multicast(): 是否是多播地址(224.0.0.0/4)
  • is_private(): 是否是私有地址(10.0.0.0/8、172.16.0.0/12、192.168.0.0/16)
  • is_link_local(): 是否是链路本地地址(169.254.0.0/16)
  • is_loopback(): 是否是环回地址(127.0.0.0/8)
  • is_unspecified(): 是否是0.0.0.0

此外,可直接对地址进行大小比较和等值比较。

SocketAddr

SocketAddr代表包含了IP地址和端口号的套接字地址,它封装了ipv4套接字地址和ipv6套接字地址:


#![allow(unused)]
fn main() {
pub enum SocketAddr {
    V4(SocketAddrV4),
    V6(SocketAddrV6),
}
}

SocketAddr实现了FromStr,因此可直接将代表套接字地址的字符串解析为SocketAddr:


#![allow(unused)]
fn main() {
use std::net::{IpAddr, Ipv4Addr, SocketAddr};

let socket: SocketAddr = "127.0.0.1:8080".parse().unwrap();
}

SocketAddr自身也提供了new()方法,需提供IpAddr和端口号(u16)作为参数:


#![allow(unused)]
fn main() {
use std::net::{IpAddr, Ipv4Addr, SocketAddr};

let ip = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1));
let socket = SocketAddr::new(ip, 8080);
}

此外,还有以下几个方法:

  • is_ipv4(): 是否是ip v4套接字地址
  • is_ipv6(): 是否是ip v6套接字地址
  • ip(): 返回IP地址
  • port(): 返回端口号
  • set_ip(): 修改IP地址
  • set_port(): 修改端口号

SocketAddr封装的代表ipv4套接字的SocketAddrV4也很简单直接,可由代表ipv4套接字的字符串解析得到,也可由new()方法创建,其也具有ip()、port()、set_ip()以及set_port()这几个方法。


#![allow(unused)]
fn main() {
use std::net::{Ipv4Addr, SocketAddrV4};

let socket = SocketAddrV4::new(Ipv4Addr::new(127, 0, 0, 1), 8080);

assert_eq!("127.0.0.1:8080".parse(), Ok(socket));
assert_eq!(socket.ip(), &Ipv4Addr::new(127, 0, 0, 1));
assert_eq!(socket.port(), 8080);
}

tokio::net::TcpListener

TcpListener代表服务端套接字,可使用bind()方法指定要绑定的地址,bind()之后再await,即可开始监听。

use tokio::net::TcpListener;

#[tokio::main]
async fn main(){
  let listener = TcpListener::bind("127.0.0.1:8888").await.unwrap();
}

这里的listener代表的是服务端负责监听的套接字。

注意,TcpListener::bind()默认会开启TCP的地址重用选项(SO_REUSEADDR)。如果想要修改该选项或设置其它TCP选项,应使用TcpSocket来创建套接字并设置选项,然后再调用bind()方法得到监听套接字。

得到监听套接字之后,可使用accept()去接收来自客户端的连接请求。accept()会阻塞(等待),直到有新的客户端发起连接请求。

accept()成功,表示和客户端之间成功建立TCP连接(连接进入Established状态),同时它会返回一个新的套接字(TcpStream)和代表客户端的套接字地址(SocketAddr)。可通过该TcpStream和客户端传输数据,可通过该SocketAddr获取客户端的地址和端口信息。如果要获取本地套接字地址相关的信息,可使用listener的local_addr()方法。

通常来说,会在一个无限循环中去accept(),这样可以保证多次接收客户端的连接请求。此外,一般也会为每一个accept()成功后返回的TcpStream去分配一个独立的线程或异步任务,这样可以异步地和每个客户端进行通信,且不影响监听套接字继续监听更多的客户端连接请求。

因此,tcp编程的服务端最基本的处理模式大致如下:

async fn main(){
    let listener = TcpListener::bind("127.0.0.1:8888").await.unwrap();

    loop {
        let (client, client_sock_addr) = listener.accept().await.unwrap();
        tokio::spawn(async move {
          // 该任务负责处理client
        });
    }
}

此外,tokio的监听套接字可和标准库的监听套接字(std::TcpListener)来回转换。由于tokio只提供了成品套接字,无法设置很多的套接字选项,因此如果需要修改或设置某些套接字选项,需要先构建标准库的套接字并设置选项,然后使用from_std()将标准库套接字转换为tokio的套接字。与from_std()对应的是into_std()

tokio::net::TcpSocket

TcpSocket用于创建和设置套接字选项,它是未进行连接的套接字,可通过bind()和listen()操作得到服务端的监听套接字,可通过connect()得到客户端的套接字。

例如,创建监听套接字,下面的操作等价于TcpListener.bind()操作,它将监听127.0.0.1:8080端口:

use tokio::net::TcpSocket;

#[tokio::main]
async fn main() {
    let addr = "127.0.0.1:8080".parse().unwrap();
    let socket = TcpSocket::new_v4().unwrap();
    socket.set_reuseaddr(true).unwrap();
    socket.bind(addr).unwrap();

    let listener = socket.listen(1024).unwrap();
}

下面的操作等价于TcpStream::connect()操作,它将连接127.0.0.1:8080并返回该连接的TcpStream:

use tokio::net::TcpSocket;

#[tokio::main]
async fn main() {
    let addr = "127.0.0.1:8080".parse().unwrap();

    let socket = TcpSocket::new_v4().unwrap();
    let stream = socket.connect(addr).await.unwrap();
}

TcpStream

TcpStream代表客户端和服务端之间已经建立的可以进行数据通信的TCP连接。当然,TcpStream也提供了connect()方法来方便地建立和TCP服务端的连接。


#![allow(unused)]
fn main() {
let mut stream = TcpStream::connect("127.0.0.1:8080").await.unwrap();
}

TcpStream用于客户端和服务端的通信,因此可对其进行读和写。读操作表示接收来自对端发送过来的数据,写操作表示将数据通过TCP连接发送给对端。但是,通常会使用tokio::io::AsyncReadExttokio::io::AsyncWriteExt提供的读写API来读写TcpStream,因尚未介绍tokio::io,因此先跳过相关的读写操作。

TcpStream本身也提供了和读写相关的一些api:

  • readable(): 等待TcpStream有数据可读
  • writable(): 等待TcpStream可写入数据
  • ready(): 类似Linux的select系统调用,注册可读、可写、读写关闭等事件后等待这些事件的出现
  • try_read(): 尝试以不等待的方式读取TcpStream
  • try_read_buf(): 尝试以不等待的方式读取TcpStream,并将读取成功的数据追加到给定的buf中
    • 和try_read()不同的是,try_read()每次读取数据后都会从前向后覆盖buf的字节,而try_read_buf()则是将读取的数据追加到buf的尾部
  • try_read_vectored(): 尝试以不等待的方式读取TcpStream,并将读取成功的数据分别填充到给定的一个或多个buf中
    • 例如,给定了两个64K大小的buf,读取了100K数据,则前64K填充到第一个buf中,剩余的36K填充到第二个buf中
  • try_write(): 尝试以不等待的方式写入TcpStream
  • try_write_vectored(): 尝试以不等待的方式写入TcpStream,写入的数据源来自于给定的一个或多个buf
  • peek(): 从TcpStream中读取数据,但不消费TcpStream中本次读取的数据。即,peek后还可以再次读取这部分数据
  • split(): 将TcpStream的读和写进行分离,得到的读、写两端不可跨线程(或任务)
  • into_split(): 将TcpStream的读和写进行分离,得到的读、写两端可跨线程(或任务)

稍后将简单介绍这些和读写相关的API的基本用法。

除了以上和IO相关的API,TcpSteam还提供了几个TCP连接选项设置的API:

  • set_linger(): 修改TCP连接的SO_LINGER选项。在关闭连接时如果仍有未发送数据(比如仍然在缓冲等待着更多数据进入),设置该选项决定是否要等待一段时间(期待后续会将缓冲的数据发送出去)才允许关闭TCP连接。若不设置该选项,则默认不等待
  • linger(): 获取linger设置的值
  • set_nodelay(): 修改TCP连接的TCP_NODELAY选项。设置该选项后,写入TcpStream的数据都将立即发送,而不会缓冲并等待凑够数据后才发送
  • nodelay(): 是否设置了nodelay选项

再来介绍TcpStream提供的和读写相关的API。

通常,读相关的操作(try_read、peek等)会结合readable()来使用,写相关的操作(try_write)会结合writable()来使用。但是注意,即便readable()、writable()的返回分别代表了可读和可写,但这个可读、可写的就绪事件并不能确保真的可读可写,因此读、写时要做好判断。

例如,readable()结合try_read():

use tokio::net::TcpStream;
use std::io;

#[tokio::main]
async fn main() {
    let stream = TcpStream::connect("127.0.0.1:8080").await.unwrap();
    let mut msg = vec![0; 1024];

    loop {
        // 等待可读事件的发生
        stream.readable().await.unwrap();

        // 即便readable()返回代表可读,但读取时仍然可能返回WouldBlock
        match stream.try_read(&mut msg) {
            Ok(n) => {    // 成功读取了n个字节的数据
                msg.truncate(n);
                break;
            }
            Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
                continue;
            }
            Err(e) => {
                return;
            }
        }
    }

    println!("GOT = {:?}", msg);
}

当然,读写操作也可以结合ready()来使用,调用ready()时可注册感兴趣的事件,当注册的事件之一发生之后,ready()将返回Ready结构体,Ready结构体有一些布尔判断方法,用来判断某个事件是否发生。

例如:

use tokio::io::Interest;
use tokio::net::TcpStream;
use std::io;

#[tokio::main]
async fn main() {
    let stream = TcpStream::connect("127.0.0.1:8080").await.unwrap();

    loop {
        // 注册可读和可写事件,并等待事件的发生
        let ready = stream.ready(Interest::READABLE | Interest::WRITABLE).await.unwrap();

        // 如果注册的事件中,发生了可读事件,则执行如下代码
        if ready.is_readable() {
            let mut data = vec![0; 1024];
            match stream.try_read(&mut data) {
                Ok(n) => {
                    println!("read {} bytes", n);
                }
                Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
                    continue;
                }
                Err(e) => {
                    return;
                }
            }
        }

        // 如果注册的事件中,发生了可写事件,则执行如下代码
        if ready.is_writable() {
            match stream.try_write(b"hello world") {
                Ok(n) => {
                    println!("write {} bytes", n);
                }
                Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
                    continue
                }
                Err(e) => {
                    return;
                }
            }
        }
    }
}

peek()可读取TcpStream中的数据,但是和其它读取操作不同,peek()读取之后不会消费TcpStream中的数据。

use tokio::net::TcpStream;
use tokio::io::AsyncReadExt;

#[tokio::main]
async fn main() {
    let mut stream = TcpStream::connect("127.0.0.1:8080").await.unwrap();
    let mut b1 = [0; 10];
    let mut b2 = [0; 10];

    let n = stream.peek(&mut b1).await.unwrap();
    let n1 = stream.read(&mut b2[..n]).await.unwrap();
}

比较关键的是split()方法。TCP连接是全双工通信的,无论是TCP连接的客户端还是服务端,每一端都可以进行读操作和写操作。为了方便描述,此处将其称为读端和写端。即,客户端有读端和写端,服务端也有读端和写端。

通过TcpStream,可进行读操作,也可以进行写操作,正如前面几个示例代码所示。但是,通过TcpStream同时进行读写有时候会很麻烦,甚至无解。很多时候,需要将TcpStream的读端和写端进行分离,然后将分离的读、写两端放进独立的异步任务中去执行读或写操作(此时需跨线程),即一个线程(或异步任务)负责读,另一个线程(或异步任务)负责写。

split()和into_split()正是用来分离TcpStream的读写两端的。

split()可将TcpStream分离为ReadHalf和WriteHalf,ReadHalf用于读,WriteHalf用于写。


#![allow(unused)]
fn main() {
let mut conn = TcpStream::connect("127.0.0.1:8888").await.unwrap();
let (mut read_half, mut write_half) = conn.split();
}

split()并没有真正将TcpStream的读写两端进行分离,仅仅只是引用TcpStream中的读端和写端。因此,split()得到的读写两端只能在当前任务中进行读写操作,不允许跨线程跨任务。

into_split()是split()的owned版,分离后可得到OwnedReadHalf和OwnedWriteHalf。它是真正地分离TcpStream的读写两端,它会消费掉TcpStream。OwnedReadHalf和OwnedWriteHalf可跨任务进行读写操作。


#![allow(unused)]
fn main() {
let conn = TcpStream::connect("127.0.0.1:8888").await.unwrap();
let (mut read_half, mut write_half) = conn.into_split();
}

请记住TcpStream的split()into_split()方法,这两个方法在tokio网络编程时非常常用。