代码基于tag: v1.0.8

在evio的实现中,没有区分主的reactor和从的reactor,每个reactor均可以处理建立连接、读写数据等操作。

数据结构

type Events struct {
  // 需要建立多少个reactor来处理请求
    NumLoops int
    // 在多个reactor中起负载均衡的作用
    LoadBalance LoadBalance
    // 接收连接时的回调
    Serving func(server Server) (action Action))
    // 连接关闭时的回调
    Closed func(c Conn, err error) (action Action)
    // 收到数据时候都回调,其中参数in为收到的数据
    Data func(c Conn, in []byte) (out []byte, action Action)
}
type loop struct {
    poll    *internal.Poll // epoll or kqueue
    packet  []byte         // reactor的read buffer
    fdconns map[int]*conn  // fd -> conn
}
type conn struct {
    fd         int              // 连接对应的fd
    out        []byte           // 发送的buffer
    ctx        interface{}      // user-defined context,使用方式可参考下面具体使用一节
    loop       *loop            // connected loop
}

代码实现

工作具体流程:

  1. 初始化reactor(poll),将listener的fd通过epoll_ctl加入到读取事件的队列中
  2. 当收到连接请求时,首先判断是否在 loop.fdconns 中,如果在,则是已存在的连接;否则新建连接
    1. 如果新建连接,则通过 accept 函数新建fd,并将fd加入到对应epoll的监听队列中
    2. 如果是已经存在的连接,则
      1. 如果相应conn中的发送buffer有数据,发送数据
      2. 否则,读取数据,并调用 event.Data(c Conn, in []byte) (out []byte, action Action)来处理数据,如果返回的out不为空,则将对应conn的fd改为epoll监听其读写事件是否就绪,在下次conn的发送缓冲区能够写入数据时,将数据写入发送缓冲区

建立连接

初始化poll的代码如下:

// evio.go#Serve
ln.ln, err = net.Listen(ln.network, ln.addr) // 建立连接
ln.system() // 将连接对应的fd修改为非阻塞态

// evio_unix.go#serve
l := &loop{
    idx:     i,
    poll:    internal.OpenPoll(),
    packet:  make([]byte, 0xFFFF),
    fdconns: make(map[int]*conn),
}
for _, ln := range listeners {
  l.poll.AddRead(ln.fd)
}

首先通过net.Listen函数建立了连接,并将fd修改为了非阻塞状态,修改为非阻塞状态是因为可能有多个reactor对该listener调用了accept方法,如果为阻塞态,则只有一个reactor能够成功从accept中返回,别的reactor就会阻塞在accept函数上。(如果对阻塞/非阻塞的问题有疑问可参考 网络io

internal.OpenPoll()方法内部调用了epoll_create方法建立了epoll的fd。对于每个listener,都调用了 AddRead方法去监听读事件的发生。

处理事件

处理事件的代码段如下:

// evio_unix.go
func loopRun(s *server, l *loop) {
    ...
    l.poll.Wait(func(fd int, note interface{}) error { // 当监听事件发生时,回调该传入的函数
        if fd == 0 {
            return loopNote(s, l, note)
        }
        c := l.fdconns[fd]
        switch {
        case c == nil:
            return loopAccept(s, l, fd)  // 当fd不在poll中的fdconns时,证明该fd为监听fd,则调用accept函数来建立新连接
        case !c.opened:
            return loopOpened(s, l, c)
        case len(c.out) > 0:
            return loopWrite(s, l, c)  // 如果fd对应的conn的发送缓冲区不为空,则发送该缓冲区中的数据 
        case c.action != None:
            return loopAction(s, l, c)
        default:
            return loopRead(s, l, c) // 否则,执行读取操作
        }
    })
}

对于每一个reactor,均会开启一个goroutine调用loopRun函数来处理事件,(*Poll).Wait 函数内部调用了epoll_wait/kevent来监听事件,当事件发生时,回调func(fd int, note interface{}) error方法。由于在初始化的时候,对于listener的监听事件没有加入l.fdconns中,因此调用回调函数时,如果发现 l.fdconns[fd] == nil时,则证明该事件是发生在listener的fd中,是有新的连接请求发生。

下面介绍下 loopAcceptloopRead以及loopWrite方法

loopAccept

// evio_unix.go
func loopAccept(s *server, l *loop, fd int) error {
    for i, ln := range s.lns {
        if ln.fd == fd {
            if len(s.loops) > 1 {
        // 调用负载均衡算法确定该reactor是否要处理该连接请求
                ...
            }
            ...
            nfd, sa, err := syscall.Accept(fd) // 调用系统的accept函数去建立新的连接
            if err != nil {
                if err == syscall.EAGAIN { // 有可能该请求已经被别的reactor处理过了,会返回EAGAIN错误
                    return nil
                }
                return err
            }
            if err := syscall.SetNonblock(nfd, true); err != nil {
                return err
            }
            c := &conn{fd: nfd, sa: sa, lnidx: i, loop: l}
            c.out = nil
            l.fdconns[c.fd] = c
            l.poll.AddReadWrite(c.fd) // 将新建立的连接加入到epoll的监听事件中
            atomic.AddInt32(&l.count, 1)
            break
        }
    }
    return nil
}

由于有多组reactor同时在监听多组listener,每当出现新的连接请求时,每个reactor都会被唤醒去处理该连接,因此同时间会有多个loop去调用loopAccept。这里需要注意的是,对于这些监听的fd,即listener,在初始化的时候都被设置成了非阻塞模式,因此,即使有多个loop同时对同一fd调用了accept ,只有一个会成功并且其余的也不会被阻塞,而只会返回 EAGAIN错误。

成功建立连接后,将新连接的fd设为非阻塞模式,添加epoll的监听事件,并新建conn结构放入loop.fdconns中以供后续使用。

loopRead

// evio_unix.go
func loopRead(s *server, l *loop, c *conn) error {
    var in []byte
    n, err := syscall.Read(c.fd, l.packet) // 将数据读入l.packet中
    if n == 0 || err != nil {
        if err == syscall.EAGAIN { // 如果 EAGAIN,证明无数据需要读,返回
            return nil
        }
        return loopCloseConn(s, l, c, err) // 出现别的错误则直接关闭连接
    }
    in = l.packet[:n]
    if !c.reuse {
        in = append([]byte{}, in...)
    }
    if s.events.Data != nil {
        out, action := s.events.Data(c, in) // 调用 Events.Data 方法处理请求
        c.action = action
        if len(out) > 0 { // 如果处理的结果不是空,则将数据加入对应连接的发送缓冲区中
            c.out = append(c.out[:0], out...)
        }
    }
    if len(c.out) != 0 || c.action != None {
        l.poll.ModReadWrite(c.fd) // 将对应fd的监听事件修改为读写
    }
    return nil
}

对于读请求,会调用系统函数read方法将数据从对应socket的接收缓冲区读入loop.packet中。成功读取后,会调用用户自定义的Events.Data方法处理数据,如果处理的结果不为空,则将数据先写入conn结构中的out中,并将该fd的监听事件修改为读写。不直接将数据发送的原因是如果该socket的发送缓冲区为满的情况下,会发送失败,造成不必要的消耗。因此选择还是依赖epoll下次的「可发送」事件的触发去调用write方法。

loopWrite

// evio_unix.go
func loopWrite(s *server, l *loop, c *conn) error {
    if s.events.PreWrite != nil {
        s.events.PreWrite()
    }
    n, err := syscall.Write(c.fd, c.out) // 调用write方法
    if err != nil {
        if err == syscall.EAGAIN {
            return nil
        }
        return loopCloseConn(s, l, c, err)
    }
    if n == len(c.out) { // 如果全部发送完成,清空c.out
        // release the connection output page if it goes over page size,
        // otherwise keep reusing existing page.
        if cap(c.out) > 4096 {
            c.out = nil
        } else {
            c.out = c.out[:0]
        }
    } else { 
        c.out = c.out[n:]
    }
    if len(c.out) == 0 && c.action == None {
        l.poll.ModRead(c.fd) // 如果发送完成,将监听事件修改为只监听读事件
    }
    return nil
}

具体使用

具体的使用就拿examples/http-server/main.go来举例,该demo是为了搭建一个能够处理http请求的服务,处理请求的方法如下

events.Data = func(c evio.Conn, in []byte) (out []byte, action evio.Action) {
   if in == nil {
      return
   }
   is := c.Context().(*evio.InputStream)
   data := is.Begin(in)
   if noparse && bytes.Contains(data, []byte("\r\n\r\n")) {
      // for testing minimal single packet request -> response.
      out = appendresp(nil, "200 OK", "", res)
      return
   }
   // process the pipeline
   var req request
   for {
      leftover, err := parsereq(data, &req)
      if err != nil {
         // bad thing happened
         out = appendresp(out, "500 Error", "", err.Error()+"\n")
         action = evio.Close
         break
      } else if len(leftover) == len(data) {
         // request not ready, yet
         break
      }
      // handle the request
      req.remoteAddr = c.RemoteAddr().String()
      out = appendhandle(out, &req)
      data = leftover
   }
   is.End(data)
   return
}

// evio.go
// Begin accepts a new packet and returns a working sequence of
// unprocessed bytes.
func (is *InputStream) Begin(packet []byte) (data []byte) {
    data = packet
    if len(is.b) > 0 {
        is.b = append(is.b, data...)
        data = is.b
    }
    return data
}

首先,tcp是个面向字节流的协议,因此Data的参数in []byte很可能不是一个完整的http报文,而只是其中的一部分,因此使用了conn.ctx数据结构来存放了之前收到的不完整的http数据,在上面代码中即为is 变量。在每次收到数据时,先调用(*InputStream).Begin方法来将之前未处理的数据和这次新接收的数据(in []byte)组合起来,再进行处理。如果是个完整的http连接(即包含[]byte("\r\n\r\n")),则返回200 OK;否则如果数据依旧合法但不完整,则将接收到的数据继续存在conn.ctx中等待下次数据来的时候一块进行处理。