通过redis学网络(1)-用go基于epoll实现最简单网络通信框架

通过,redis,网络,go,基于,epoll,实现,简单,网络通信,框架 · 浏览次数 : 182

小编点评

# golang epoll server example **go code:** ```go package main import ( "epoll" ) func main() { // create an epoll server server, err := epoll.NewServer() if err != nil { log.Fatal(err) } defer server.Close() // listen for events on our server events, err := server.WaitEvents() if err != nil { log.Fatal(err) } // handle events for each connected client for event := events { if event.Type == epoll.EPOLLIN { connInf, ok := server.ConnMap.Load(int(event.Event)) if ok { log.Printf("new client connected: %d\n", connInf.FD) conn, err := server.NewConn(connInf.FD) if err != nil { log.Fatal(err) } conn.SetRead(func(p []byte) (n int, err error) { return n, err }) } } } } ``` **Explanation:** 1. We create an epoll server using `epoll.NewServer()`. 2. We listen for events on our server using `server.WaitEvents()`. 3. We handle events for each connected client. We check the event type and the client's ID. 4. If the event is `EPOLLIN`, we get the connected client's `FD` from the `server.ConnMap`. 5. We use `server.NewConn` to create a new client connection. 6. We set the read event for the new client connection. 7. We handle events for each connected client. We check the event type and the client's ID. 8. If the event is `EPOLLIN`, we get the connected client's `FD` from the `server.ConnMap`. 9. We use `server.NewConn` to create a new client connection. 10. We set the read event for the new client connection. **Additional notes:** * We use the `epoll.EPOLLIN` event type to handle client connections. * The `server.ConnMap` allows us to access the `FD` of connected clients. * We use the `server.NewConn` function to create new client connections. * We set the `Read` event for the new client connection. * We handle events for each connected client.

正文

image.png

本系列主要是为了对redis的网络模型进行学习,我会用golang实现一个reactor网络模型,并实现对redis协议的解析。

系列源码已经上传github

https://github.com/HobbyBear/tinyredis/tree/chapter1

redis的网络模型是基于epoll实现的,所以这一节让我们先基于epoll,实现一个最简单的服务端客户端通信模型。在实现前,先来简单的了解下epoll的原理。

为什么不用golang的原生的netpoll网络框架呢,这是因为netpoll框架虽然底层也是基于epoll实现,但是它提供给开发人员使用网络io方式依然是同步阻塞模式,一个连接单独的拿给一个协程去处理,为了更加真实的感受下redis的网络模型,我们不用netpoll框架,而是自己写一个非阻塞的网络模型。

epoll 网络通信原理

通常情况下服务端的处理客户端请求的逻辑是客户端每发起一个连接,服务端就单独起一个线程去处理这个连接的请求,对于go应用程序而言,则是启用一个协程去处理这个连接。 而采用epoll相关的api后,能够让我们在一个线程或者协程里去处理多个连接的请求。

一个套接字连接对应一个文件描述符,当收到客户端的连接请求时,可以将对应的文件描述符加入到epoll实例关注的事件中去。

在golang里,可以通过syscall.EpollCreate1 去创建一个epoll实例。

func EpollCreate1(flag int) (fd int, err error) 

其返回结果的fd就代表epoll实例的fd,当收到客户端的连接请求时,便可以将客户端连接的fd,通过EpollCtl 加入到epoll实例感兴趣的事件当中。

func EpollCtl(epfd int, op int, fd int, event *EpollEvent) (err error) 

EpollCtl 方法参数的epfd则是EpollCreate1 返回的fd,EpollCtl的第二个参数则是代表客户端连接的fd,通过我们在获取到客户端连接后,后续的行为便是查看客户端是否有数据发送过来或者往客户端发送数据,这些在epoll api里用event事件去表示,分别对应了读event和写event,这便是EpollCtl第三个参数所代表的含义。

将这些感兴趣事件添加到epoll实例中后,就代表epoll实例后续会监听这些连接的读写事件的到达,那么读写事件到达后,用户程序又是如何知道的呢,这就要提到epoll相关的另一个api,EpollWait。

func EpollWait(epfd int, events []EpollEvent, msec int) (n int, err error) 

EpollWait的第二个参数是一个事件数组,用户应用程序调用EpollWait时传入一个固定长度的事件数组,然后EpollWait会将这个数组尽可能填满,这样用户程序便能知道有哪些事件类型到达了,EpollEvent类型如下所示:

type EpollEvent struct {
	Events uint32
	Fd     int32
	Pad    int32
}

其中fd则代表这些事件所关联的客户端连接的fd,通过这个fd,我们便可以对对应连接进行读写操作了。

而Events是个枚举类型,比较常用的枚举以及含义如下:

类型 解释
EPOLLIN 表示文件描述符可读。
EPOLLRDHUP 表示 TCP 连接的远程端点关闭或半关闭连接
EPOLLET 表示使用边缘触发模式来监听事件
EPOLLOUT 表示文件描述符可写
EPOLLERR 表示文件描述符发生错误时发生,这个事件不通过EpollCtl添加也能触发
EPOLLHUP 与EPOLLRDHUP类似同样表示连接关闭,在不支持EPOLLRDHUP的linux版本会触发,这个事件不通过EpollCtl添加也能触发

虽然epoll event还有其他类型,不过一般情况下监控这几种类型就足够了,golang的netpoll框架在添加连接的文件描述符时事件时也只添加了这几种类型。netpoll的部分源码如下:

func netpollopen(fd uintptr, pd *pollDesc) int32 {
	var ev epollevent
	ev.events = _EPOLLIN | _EPOLLOUT | _EPOLLRDHUP | _EPOLLET
	*(**pollDesc)(unsafe.Pointer(&ev.data)) = pd
	return -epollctl(epfd, _EPOLL_CTL_ADD, int32(fd), &ev)
}

如何用golang创建基于epoll的网络框架

了解完epoll的一些概念以后,现在来看下我们需要实现的网络框架模型是怎样的。我们先实现一个最简单的网络通信框架,客户端发送来消息,然后服务端打印收到的消息。

Pasted image 20230605160424.png

如上图所示,我们收到新的连接后,会调用epoll实例的EpollCtl方法将连接的可读事件添加到epoll实例中,接着调用EpollWait方法等待客户端再次发送消息时,让连接变为可读。

下面是程序的效果测试结果

效果测试

效果演示.png

启动了两个终端,其中右边的终端连接上redis以后,发送了1231,然后左边的终端收到后将收到的消息打印出来。

go代码实现

接着,我们来看看实际代码编写逻辑。

我们定义一个Server的结构体来代表epoll的server。

Conn是对golang原生连接类型net.Conn的包装,。

poll结构体是封装了对epoll api的调用。

type Server struct {  
   Poll     *poll  
   addr     string  
   listener net.Listener  
   ConnMap  sync.Map  
}

type Conn struct {  
   s    *Server  
   conn *net.TCPConn  
   nfd  int  
}


type poll struct {
	EpollFd int
}

接着来看下如何启动一个Server,NewServer是返回一个Server实例,Server 调用Run方法后,才算Server正式启动了起来。

在Run 方法里,构建监听连接的listener,构建一个epoll实例,用于后续对事件的监听,同时把监听握手连接和处理连接可读数据分成了两个协程分别用accept方法,和handler方法执行。

func NewServ(addr string) *Server {  
   return &Server{addr: addr, ConnMap: sync.Map{}}  
}  
  
func (s *Server) Run() error {  
   listener, err := net.Listen("tcp", s.addr)  
   if err != nil {  
      return err  
   }  
   s.listener = listener  
   epollFD, err := syscall.EpollCreate1(0)  
   if err != nil {  
      return err  
   }  
   s.Poll = &poll{EpollFd: epollFD}  
   go s.accept()  
   go s.handler()  
   ch := make(chan int)  
   <-ch  
   return nil  
}

accept 方法里执行的逻辑就是将握手完成的链接从全连接队列里取出来,将其连接的文件描述符和连接存储到一个map里, 然后将对应的文件描述符通过epoll的epollCtl 系统调用监听它的可读事件,后续客户端再使用这个连接发送数据时,epoll就能监听到了。

func (s *Server) accept() {  
   for {  
      acceptConn, err := s.listener.Accept()  
      if err != nil {  
         return  
      }  
      var nfd int  
      rawConn, err := acceptConn.(*net.TCPConn).SyscallConn()  
      if err != nil {  
         log.Error(err.Error())  
         continue  
      }  
      rawConn.Control(func(fd uintptr) {  
         nfd = int(fd)  
      })  
      // 设置为非阻塞状态  
      err = syscall.SetNonblock(nfd, true)  
      if err != nil {  
         return  
      }  
      err = s.Poll.AddListen(nfd)  
      if err != nil {  
         log.Error(err.Error())  
         continue  
      }  
      c := &Conn{  
         conn: acceptConn.(*net.TCPConn),  
         nfd:  nfd,  
         s:    s,  
      }  
      s.ConnMap.Store(nfd, c)  
   }  
}

handler里的逻辑则是通过epoll Wait系统调用等待可读事件产生,到达后,根据事件的文件描述符找到对应连接,然后读取对应连接的数据。

func (s *Server) handler() {  
   for {  
      events, err := s.Poll.WaitEvents()  
      if err != nil {  
         log.Error(err.Error())  
         continue  
      }  
      for _, e := range events {  
         connInf, ok := s.ConnMap.Load(int(e.FD))  
         if !ok {  
            continue  
         }  
         conn := connInf.(*Conn)  
         if IsClosedEvent(e.Type) {  
            conn.Close()  
            continue  
         }  
         if IsReadableEvent(e.Type) {  
            buf := make([]byte, 1024)  
            rd, err := conn.Read(buf)  
            if err != nil && err != syscall.EAGAIN {  
               conn.Close()  
               continue  
            }  
            fmt.Println("收到消息", string(buf[:rd]))  
         }  
      }  
   }  
}

主干代码是比较容易理解的,但是用golang使用epoll 时有几个点 需要注意下:

第一点是IsReadableEvent 的判断方式,epoll的每个event 都有一个位掩码,位掩码是什么意思呢?比如EPOLLIN 的值 是0x1,二进制就是00000001,EPOLLHUP 的值是0x10,二进制表示是00010000,那么epoll wait系统调用的event要如何同时表示同一个文件描述符同时拥有这两个事件呢? epoll 的event会将对应的位掩码设置为和对应事件一致,比如同时拥有EPOLLIN和EPOLLHUP,那么event的值将会是00010001,所以利用与位运算是不是就能判断event是否具有某个事件了。因为1只有与1进行与运算结果才为1。

func IsReadableEvent(event uint32) bool {
	if event&syscall.EPOLLIN != 0 {
		return true
	}
	return false
}

第二点是如何读取连接的数据, 我们后续要达到的目的是在同一个事件循环里能处理多个连接,所以要保证读取连接中的数据时不能阻塞,通过调用golang的net.Conn下的read方法是阻塞的,其read实现最终会调用到下面👇🏻👇🏻👇🏻的这个方法。

func (fd *FD) Read(p []byte) (int, error) {  
   if err := fd.readLock(); err != nil {  
      return 0, err  
   }  
   defer fd.readUnlock()  
   if len(p) == 0 {  
      // If the caller wanted a zero byte read, return immediately  
      // without trying (but after acquiring the readLock).      // Otherwise syscall.Read returns 0, nil which looks like      // io.EOF.      // TODO(bradfitz): make it wait for readability? (Issue 15735)      return 0, nil  
   }  
   if err := fd.pd.prepareRead(fd.isFile); err != nil {  
      return 0, err  
   }  
   if fd.IsStream && len(p) > maxRW {  
      p = p[:maxRW]  
   }  
   for {  
      n, err := ignoringEINTRIO(syscall.Read, fd.Sysfd, p)  
      if err != nil {  
         n = 0  
         if err == syscall.EAGAIN && fd.pd.pollable() {  
            if err = fd.pd.waitRead(fd.isFile); err == nil {  
               continue  
            }  
         }  
      }  
      err = fd.eofError(n, err)  
      return n, err  
   }  
}

这个方法会在for循环中判断系统调用syscall.Read 的返回,如果是syscall.EAGAIN 那么会让当前协程睡眠,等待被唤醒。

syscall.EAGAIN 错误是在非阻塞io进行读写时才有可能产生的,在读取数据时,如果发现读缓冲区没有数据到达,则返回这个syscall.EAGAIN错误,在写入数据时,如果写缓冲区满了,也会返回这个错误。

既然golang的net.Conn下的read方法是阻塞的,那么我们就自己实现下conn的Read方法。

func (c *Conn) Read(p []byte) (n int, err error) {  
   rawConn, err := c.conn.SyscallConn()  
   if err != nil {  
      return 0, err  
   }  
   rawConn.Read(func(fd uintptr) (done bool) {  
      n, err = syscall.Read(int(fd), p)  
      if err != nil {  
         return true  
      }  
      return true  
   })  
   return  
}

👆🏻👆🏻的Read方法是我们自定义的Conn类型实现的Read方法,原生的连接类型是net.Conn,它有一个SyscallConn 能够获取到更加底层的连接类型,从这个类型能够获取到该网络连接的文件描述符fd,我们通过直接调用系统调用syscall.Read来从该网络连接读取数据。 并且碰到错误则直接返回。后续 syscall.EAGAIN错误会交给上层handler方法去进行处理。

总结

这节算是用golang去演示了下如何对epoll api的调用,并且能够实现最简单的客户端服务端通信,下一节我会讲解redis的网络模型是怎么样的,你可以从中了解到经常说的redis的单线程具体是指什么,了解到reactor网络模型是怎样的?

与通过redis学网络(1)-用go基于epoll实现最简单网络通信框架相似的内容:

通过redis学网络(1)-用go基于epoll实现最简单网络通信框架

![image.png](https://img2023.cnblogs.com/blog/1382767/202306/1382767-20230607105418219-574417823.png) > 本系列主要是为了对redis的网络模型进行学习,我会用golang实现一个reactor网络

通过redis学网络(2)-redis网络模型

> 本系列主要是为了对redis的网络模型和集群原理进行学习,我会用golang实现一个reactor网络模型,并实现对redis协议的解析。 系列源码已经上传github ```go https://github.com/HobbyBear/tinyredis/tree/chapter2 ```

[转帖]如何应对变慢的Redis-波动的响应延迟

Redis突然变慢了,如何排查呢 文章目录 问题认定应对方案Redis自身操作特性慢查询命令过期key操作 文件系统AOF解决方案 操作系统Swap内存大页 CheckList 问题认定 如何判断Redis是不是真的变慢了?通过Redis的响应延迟 大部分时候,Redis延迟很低,某些时刻,Redi

一台服务器上部署 Redis 伪集群

哈喽大家好,我是咸鱼 今天这篇文章介绍如何在一台服务器(以 CentOS 7.9 为例)上通过 `redis-trib.rb` 工具搭建 Redis cluster (三主三从) `redis-trib.rb` 是一个基于 Ruby 编写的脚本,其功能涵盖了创建、管理以及维护 Redis 集群的各个

数据安全没保证?GaussDB(for Redis)为你保驾护航

摘要:GaussDB (for Redis)通过账号管理、权限隔离、高危命令禁删/重命名、安全IP免密登录、实例回收站等企业级特性,保障用户数据库数据和信息安全。 本文分享自华为云社区《数据安全没保证?GaussDB(for Redis)为你保驾护航》,作者: GaussDB 数据库。 近日,一些用

6步带你用Spring Boot开发出商城高并发秒杀系统

摘要:本博客将介绍如何使用 Spring Boot 实现一个简单的商城秒杀系统,并通过使用 Redis 和 MySQL 来增强其性能和可靠性。 本文分享自华为云社区《Spring Boot实现商城高并发秒杀案例》,作者:林欣。 随着经济的发展和人们消费观念的转变,电子商务逐渐成为人们购物的主要方式之

服务端应用多级缓存架构方案

## 一:场景 20w的QPS的场景下,服务端架构应如何设计? ## 二:常规解决方案 可使用分布式缓存来抗,比如redis集群,6主6从,主提供读写,从作为备,不提供读写服务。1台平均抗3w并发,还可以抗住,如果QPS达到100w,通过增加redis集群中的机器数量,可以扩展缓存的容量和并发读写能

[转帖]Redis的高并发及高可用,到底该如何保证?

https://zhuanlan.zhihu.com/p/404481762 一、redis如何通过读写分享来承载读请求QPS超过10万+ 1、redis高并发跟整个系统的高并发之间的关系 redis,你要搞高并发的话,不可避免,要把底层的缓存搞得很好 mysql,高并发,做到了,那么也是通过一系列

真·Redis缓存优化—97%的优化率你见过嘛?

本文通过一封618前的R2M(公司内部缓存组件,可以认为等同于Redis)告警,由浅入深的分析了该告警的直接原因与根本原因,并根据原因提出相应的解决方法,希望能够给大家在排查类似问题时提供相应的思路。

[转帖]调整Redis定期任务的执行频率

https://help.aliyun.com/document_detail/142171.html 通过修改hz参数的值,您可以调整Redis执行定期任务的频率,从而改变Redis清除过期key、清理超时连接的效率。 Redis定期任务与hz参数的关系 为了定期检测资源和服务状态并根据预定策略执