代码基于tag: v1.0.8
在evio的实现中,没有区分主的reactor和从的reactor,每个reactor均可以处理建立连接、读写数据等操作。
数据结构
type Events struct {
// 需要建立多少个reactor来处理请求
NumLoops int
// 在多个reactor中起负载均衡的作用
LoadBalance LoadBalance
// 接收连接时的回调
Serving func(server Server) (action Action))
// 连接关闭时的回调
Closed func(c Conn, err error) (action Action)
// 收到数据时候都回调,其中参数in为收到的数据
Data func(c Conn, in []byte) (out []byte, action Action)
}
type loop struct {
poll *internal.Poll // epoll or kqueue
packet []byte // reactor的read buffer
fdconns map[int]*conn // fd -> conn
}
type conn struct {
fd int // 连接对应的fd
out []byte // 发送的buffer
ctx interface{} // user-defined context,使用方式可参考下面具体使用一节
loop *loop // connected loop
}
代码实现
工作具体流程:
- 初始化reactor(poll),将listener的fd通过epoll_ctl加入到读取事件的队列中
- 当收到连接请求时,首先判断是否在 loop.fdconns 中,如果在,则是已存在的连接;否则新建连接
- 如果新建连接,则通过 accept 函数新建fd,并将fd加入到对应epoll的监听队列中
- 如果是已经存在的连接,则
- 如果相应conn中的发送buffer有数据,发送数据
- 否则,读取数据,并调用 event.Data(c Conn, in []byte) (out []byte, action Action)来处理数据,如果返回的out不为空,则将对应conn的fd改为epoll监听其读写事件是否就绪,在下次conn的发送缓冲区能够写入数据时,将数据写入发送缓冲区
建立连接
初始化poll的代码如下:
// evio.go#Serve
ln.ln, err = net.Listen(ln.network, ln.addr) // 建立连接
ln.system() // 将连接对应的fd修改为非阻塞态
// evio_unix.go#serve
l := &loop{
idx: i,
poll: internal.OpenPoll(),
packet: make([]byte, 0xFFFF),
fdconns: make(map[int]*conn),
}
for _, ln := range listeners {
l.poll.AddRead(ln.fd)
}
首先通过net.Listen
函数建立了连接,并将fd修改为了非阻塞状态,修改为非阻塞状态是因为可能有多个reactor对该listener调用了accept方法,如果为阻塞态,则只有一个reactor能够成功从accept中返回,别的reactor就会阻塞在accept函数上。(如果对阻塞/非阻塞的问题有疑问可参考 网络io
internal.OpenPoll()
方法内部调用了epoll_create
方法建立了epoll的fd。对于每个listener,都调用了 AddRead
方法去监听读事件的发生。
处理事件
处理事件的代码段如下:
// evio_unix.go
func loopRun(s *server, l *loop) {
...
l.poll.Wait(func(fd int, note interface{}) error { // 当监听事件发生时,回调该传入的函数
if fd == 0 {
return loopNote(s, l, note)
}
c := l.fdconns[fd]
switch {
case c == nil:
return loopAccept(s, l, fd) // 当fd不在poll中的fdconns时,证明该fd为监听fd,则调用accept函数来建立新连接
case !c.opened:
return loopOpened(s, l, c)
case len(c.out) > 0:
return loopWrite(s, l, c) // 如果fd对应的conn的发送缓冲区不为空,则发送该缓冲区中的数据
case c.action != None:
return loopAction(s, l, c)
default:
return loopRead(s, l, c) // 否则,执行读取操作
}
})
}
对于每一个reactor,均会开启一个goroutine调用loopRun
函数来处理事件,(*Poll).Wait
函数内部调用了epoll_wait
/kevent
来监听事件,当事件发生时,回调func(fd int, note interface{}) error
方法。由于在初始化的时候,对于listener的监听事件没有加入l.fdconns
中,因此调用回调函数时,如果发现 l.fdconns[fd] == nil
时,则证明该事件是发生在listener的fd中,是有新的连接请求发生。
下面介绍下 loopAccept
、loopRead
以及loopWrite
方法
loopAccept
// evio_unix.go
func loopAccept(s *server, l *loop, fd int) error {
for i, ln := range s.lns {
if ln.fd == fd {
if len(s.loops) > 1 {
// 调用负载均衡算法确定该reactor是否要处理该连接请求
...
}
...
nfd, sa, err := syscall.Accept(fd) // 调用系统的accept函数去建立新的连接
if err != nil {
if err == syscall.EAGAIN { // 有可能该请求已经被别的reactor处理过了,会返回EAGAIN错误
return nil
}
return err
}
if err := syscall.SetNonblock(nfd, true); err != nil {
return err
}
c := &conn{fd: nfd, sa: sa, lnidx: i, loop: l}
c.out = nil
l.fdconns[c.fd] = c
l.poll.AddReadWrite(c.fd) // 将新建立的连接加入到epoll的监听事件中
atomic.AddInt32(&l.count, 1)
break
}
}
return nil
}
由于有多组reactor同时在监听多组listener,每当出现新的连接请求时,每个reactor都会被唤醒去处理该连接,因此同时间会有多个loop去调用loopAccept
。这里需要注意的是,对于这些监听的fd,即listener,在初始化的时候都被设置成了非阻塞模式,因此,即使有多个loop同时对同一fd调用了accept
,只有一个会成功并且其余的也不会被阻塞,而只会返回 EAGAIN
错误。
成功建立连接后,将新连接的fd设为非阻塞模式,添加epoll的监听事件,并新建conn结构放入loop.fdconns
中以供后续使用。
loopRead
// evio_unix.go
func loopRead(s *server, l *loop, c *conn) error {
var in []byte
n, err := syscall.Read(c.fd, l.packet) // 将数据读入l.packet中
if n == 0 || err != nil {
if err == syscall.EAGAIN { // 如果 EAGAIN,证明无数据需要读,返回
return nil
}
return loopCloseConn(s, l, c, err) // 出现别的错误则直接关闭连接
}
in = l.packet[:n]
if !c.reuse {
in = append([]byte{}, in...)
}
if s.events.Data != nil {
out, action := s.events.Data(c, in) // 调用 Events.Data 方法处理请求
c.action = action
if len(out) > 0 { // 如果处理的结果不是空,则将数据加入对应连接的发送缓冲区中
c.out = append(c.out[:0], out...)
}
}
if len(c.out) != 0 || c.action != None {
l.poll.ModReadWrite(c.fd) // 将对应fd的监听事件修改为读写
}
return nil
}
对于读请求,会调用系统函数read
方法将数据从对应socket的接收缓冲区读入loop.packet
中。成功读取后,会调用用户自定义的Events.Data
方法处理数据,如果处理的结果不为空,则将数据先写入conn
结构中的out
中,并将该fd的监听事件修改为读写。不直接将数据发送的原因是如果该socket的发送缓冲区为满的情况下,会发送失败,造成不必要的消耗。因此选择还是依赖epoll下次的「可发送」事件的触发去调用write
方法。
loopWrite
// evio_unix.go
func loopWrite(s *server, l *loop, c *conn) error {
if s.events.PreWrite != nil {
s.events.PreWrite()
}
n, err := syscall.Write(c.fd, c.out) // 调用write方法
if err != nil {
if err == syscall.EAGAIN {
return nil
}
return loopCloseConn(s, l, c, err)
}
if n == len(c.out) { // 如果全部发送完成,清空c.out
// release the connection output page if it goes over page size,
// otherwise keep reusing existing page.
if cap(c.out) > 4096 {
c.out = nil
} else {
c.out = c.out[:0]
}
} else {
c.out = c.out[n:]
}
if len(c.out) == 0 && c.action == None {
l.poll.ModRead(c.fd) // 如果发送完成,将监听事件修改为只监听读事件
}
return nil
}
具体使用
具体的使用就拿examples/http-server/main.go来举例,该demo是为了搭建一个能够处理http请求的服务,处理请求的方法如下
events.Data = func(c evio.Conn, in []byte) (out []byte, action evio.Action) {
if in == nil {
return
}
is := c.Context().(*evio.InputStream)
data := is.Begin(in)
if noparse && bytes.Contains(data, []byte("\r\n\r\n")) {
// for testing minimal single packet request -> response.
out = appendresp(nil, "200 OK", "", res)
return
}
// process the pipeline
var req request
for {
leftover, err := parsereq(data, &req)
if err != nil {
// bad thing happened
out = appendresp(out, "500 Error", "", err.Error()+"\n")
action = evio.Close
break
} else if len(leftover) == len(data) {
// request not ready, yet
break
}
// handle the request
req.remoteAddr = c.RemoteAddr().String()
out = appendhandle(out, &req)
data = leftover
}
is.End(data)
return
}
// evio.go
// Begin accepts a new packet and returns a working sequence of
// unprocessed bytes.
func (is *InputStream) Begin(packet []byte) (data []byte) {
data = packet
if len(is.b) > 0 {
is.b = append(is.b, data...)
data = is.b
}
return data
}
首先,tcp是个面向字节流的协议,因此Data的参数in []byte
很可能不是一个完整的http报文,而只是其中的一部分,因此使用了conn.ctx
数据结构来存放了之前收到的不完整的http数据,在上面代码中即为is
变量。在每次收到数据时,先调用(*InputStream).Begin
方法来将之前未处理的数据和这次新接收的数据(in []byte
)组合起来,再进行处理。如果是个完整的http连接(即包含[]byte("\r\n\r\n")
),则返回200 OK
;否则如果数据依旧合法但不完整,则将接收到的数据继续存在conn.ctx
中等待下次数据来的时候一块进行处理。
Comments | NOTHING