从Go编程看IO多路复用Select
IO多路复用通过某种机制使进程监听某些文件描述符,当文件描述符中有读或写就绪时,进程能够收到系统内核发送的相应通知从而进行相应的IO操作;IO多路复用有:select、poll、epoll等模式,这里主要介绍select;select本质上也是同步IO,调用时阻塞自己,IO事件就绪后被唤醒返回负责读写操作;
在Go中其函数定义如下:
func Select(nfd int, r *FdSet, w *FdSet, e *FdSet, timeout
*Timeval) (n int, err error)
FdSet定义:
type FdSet struct {
Bits [16]int64
}
select函数实现IO多路复用,通过其参数通知内核:
1、关注的文件描述符
2、关心的文件描述符的哪种状态:可读、可写还是异常
3、等待时间,无限等待阻塞或是固定超时时间
函数参数
通过上面的介绍可以知道我们需要有这么几种参数传递给select函数,所关注的描述符,所关注的状态、等待时间;
函数参数具体含义:
nfd(maxfd): 文件描述符集合中要监听的文件描述符个数,0-(maxfd-1)为需要检测的文件描述符;
r(readfds): 读监控文件描述符集,监控文件描述符集的读变化,如文件描述符集中有文件可读即通过该参数回传有变化的描述符,清空无变化的描述符;
w(writefds): 写监控文件描述符集,监控文件描述符集的写变化,如文件描述符集中有文件可写即通过该参数回传有变化的描述符,清空无变化的描述符;
e(exceptfds): 异常监控文件描述符集,监控文件描述符集的异常,如文件描述符集中有文件异常即通过该参数回传有变化的描述符,清空无变化的描述符;
timeout参数: 传入nil时函数无限阻塞等待,整数值为超时时间;
上面三个文件描述符集合如无需关注某一类状态可传入nil,则select将不监控文件描述符的读、写或异常;
tcp连接中可只需关注是否可读即可;
函数返回:
通过函数返回可知这么两类信息:
1、准备好的文件描述符个数
2、具体哪些文件描述符处于就绪可读、可写或异常状态
函数值:
-1 发生错误
0 函数超时,当设置了超时时间,该时间内未有状态变化时
大于0 有满足读、写、异常的文件描述符,需检查文件描述符集
特别关注
每次函数返回时都会将文件描述符集FdSet中未发生任何事件的fd清空,每次调用select时都需将所关注的fd重新加入FdSet中;
可监控文件描述符个数取决于 FdSet中Bits的位长度,每个bit代表一个文件描述符,默认情况下Go中的定义为:Bits [16]int64,也就是一个8字节整数数组,数组长度为16,第一个数组元素可存储的文件描述符为:0-63,第二个为:64-127依次类推;此时最多可以监听的文件描述符数为1024个;
Select的相关问题:
1、内核将消息传递到用户空间需要执行系统拷贝,如监听了大量fd会导致性能下降
2、每次调用select都需要从用户态拷贝fd集合到内核态
3、每次调用select内核态都需要遍历传进来的所有fd集合
4、默认select支持的fd集合过小,只有1024;
5、轮询效率低,每次调用select、内核通知都需要轮询整个fd集合
Go中的代码实现:
func SelectIO(fd int) {
//读文件描述符集
fdReadSet := &syscall.FdSet{}
connect = &Connect{maxFd: fd, childsMap: map[string]int{}}
for {
FD_ZERO(fdReadSet)
FD_SET(fd, fdReadSet) //socket文件描述符
for _, child := range connect.childsMap {
FD_SET(child, fdReadSet) //连接监听
if child > connect.maxFd {
connect.maxFd = child
} else {
}
}
n, err := syscall.Select(connect.maxFd+1, fdReadSet, nil, nil, nil)
if err != nil {
log.Println(err)
if err == syscall.EAGAIN { //非阻塞模型下资源限制或不满足条件返回eagain 异常 Resource temporarily unavailable
continue
}
}
log.Printf("n:%v,fdReadSet:%v", n, FD_ISSET(fd, fdReadSet))
//-1 出错 >0就绪的文件描述符数 0 超时
if n > 0 && FD_ISSET(fd, fdReadSet) {
//接受连接
nfd, naddr, err := syscall.Accept(fd) //阻塞模式下在此方法会阻塞
if err != nil {
log.Printf("接受连接出错:%v,%v", fd, err)
continue
}
//设置非阻塞
if err := syscall.SetNonblock(nfd, true); err != nil {
log.Fatal(err)
}
var addr = naddr.(*syscall.SockaddrInet4)
var ip = fmt.Sprintf("%d.%d.%d.%d:%d", addr.Addr[0], addr.Addr[1],
addr.Addr[2], addr.Addr[3], addr.Port)
log.Printf("新连接:%v", ip)
processConn(nfd, ip)
} else {
readMsg(fdReadSet)
}
}
}
文章首发地址:https://mp.weixin.qq.com/s/1co33_BaJEwqrSX4GxkYEA