Redis网络模型

redis · 浏览次数 : 0

小编点评

```go import ( "bufio" "errors" "fmt" "io" "net" "strconv" ) // RedisClient 封装用于连接type RedisClient struct type RedisClient struct { conn net.Conn writer *bufio.Writer reader *bufio.Reader } // main 函数 func main() { // 连接redis conn, err := net.Dial("tcp", "127.0.0.1:6379") if err != nil { fmt.Println("连接redis失败", err) } client := RedisClient{conn: conn, writer: bufio.NewWriter(conn), reader: bufio.NewReader(conn)} defer client.conn.Close() // 发送命令 client.sendRequest([]string{"set", "name", "方块"}) // zrange boards:2024-4 0 -1 client.sendRequest([]string{"zrange", "boards:2024-4", "0", "-1"}) // LRANGE tlist 0 -1 client.sendRequest([]string{"LRANGE", "tlist", "0", "-1"}) // 处理返回结果 response := client.handleResponse() // 打印结果 fmt.Printf("\n第一个操作数:%v\n", response) } // 写命令 func (client *RedisClient) writeCommand(s string) { client.conn.Write([]byte(s + "\r\")) } // 解析返回结果 func (client *RedisClient) handleResponse() interface{} { // 先读取第一个字符 r, _, _ := client.reader.ReadRune() flag := string(r) switch flag { case "+\": // 行内字符串 return client.ReadLine() case "-\": // 异常 return client.ReadLine() case "\": // 数字 line := client.ReadLine() res, _ := strconv.Atoi(line) return res case "$\": // 多行字符串 return client.readBulkString() default: return errors.New("错误") } } // 读取一行 func (client *RedisClient) ReadLine() string { bytes, _, _ := client.reader.ReadLine() return string(bytes) } // 读取到末尾 估计有点问题 func (client *RedisClient) ReadToEnd() string { var size = 1024 bytes := make([]byte, size) var temp = \"\"" for { n, err := client.reader.Read(bytes) temp += string(bytes[:n]) if err == io.EOF || n == 0 || n < size { break } } return temp } ``` **注意:** * 此代码需要在运行之前安装 `go get` 命令,并将 `redis/v8.6/cmd.exe` 放在系统路径中。 * 代码中的排版只是简单的格式化,您可以根据自己的需求进行调整。

正文

主从复制原理

  1. 建立连接

    1. 从节点在配置了 replicaof 配置了主节点的ip和port
    2. 从库执行replicaof 并发送psync命令
  2. 同步数据到从库

    1. 主库bgsave生成RDB文件,并发送给从库,同时为每一个slave开辟一块 replication buffer 缓冲区记录从生成rdb文件开始收到的所有写命令。
    2. 从库清空数据并加载rdb
  3. 发送新写的命令给从库

    1. 从节点加载 RDB 完成后,主节点将 replication buffer 缓冲区的数据(增量写命令)发送到从节点,slave 接收并执行,从节点同步至主节点相同的状态。
  4. 基于长连接传播

    1. 方便后续命令传输,除了写命令,还维持着心跳机制

网络模型

  • 阻塞IO

  • 非阻塞IO

    • 没啥用,还是需要等待数据准备就绪
  • IO多路复用

  • 信号驱动IO

  • 异步IO

IO多路复用

文件描述符:简称FD

select

流程:

  1. 创建fd_set rfds

    1. 假设要监听的fd为1,2,5
  2. 执行select(5+1,null,null,3)

    1. 第一个参数为最大长度+1,后面是等待时间
  3. 内核遍历fd

    1. 没有数据,休眠

      1. 等待数据就绪或者超时
  4. 遍历fd_set 找到就绪的数据

存在的问题

  • 需要将整个fd_set从用户空间拷贝到内核空间,select结束后还要拷贝回用户空间
  • 需要遍历一次
  • 最大为监听1024

poll

将数据改为了链表,还是需要遍历

epoll

  1. epoll_create 创建epoll实例
  2. epoll_ctl 添加需要监听的fd,关联callback
  3. epoll_wait 等待fd就绪

epoll_wait有两种通知模式

  • levelTriggered 简称LT 当FD有数据可读的时候,会重复通知多次,直到数据处理完成。是epoll的默认模式
  • EdgeTriggered 简称ET 。当FD有数据可读的时候只会被通知一次,不管数据是否处理完成

ET模式避免了LT的惊群效应。

ET模式最好结合非阻塞IO。

淘汰策略

分类:全体,ttl

LRU

抽样LRU 不是严格的

LFU

RedisObject中使用逻辑访问次数

访问频率越高,增加的越小,还会衰减(16位记录时间,8位记录逻辑访问次数)

总结

image.png

附带的tcp连接redis客户端 使用go编写的

package main

import (
    "bufio"
    "errors"
    "fmt"
    "io"
    "net"
    "strconv"
)

// RedisClient 封装用于连接
type RedisClient struct {
    conn net.Conn
    //包装一层 方便读写
    writer *bufio.Writer
    reader *bufio.Reader
}

func main() {
    //连接redis
    conn, err := net.Dial("tcp", "127.0.0.1:6379")
    if err != nil {
       fmt.Println("连接redis失败", err)
    }
    client := RedisClient{conn: conn, writer: bufio.NewWriter(conn), reader: bufio.NewReader(conn)}
    defer client.conn.Close()
    //发送命令
    client.sendRequest([]string{"set", "name", "方块"})
    //zrange boards:2024-4 0 -1
    //client.sendRequest([]string{"zrange", "boards:2024-4", "0", "-1"})
    //LRANGE tlist 0 -1
    client.sendRequest([]string{"LRANGE", "tlist", "0", "-1"})

}

func (client *RedisClient) sendRequest(args []string) interface{} {

    length := len(args)
    firstCommand := fmt.Sprintf("%s%d", "*", length)
    client.writeCommand(firstCommand)
    for _, s := range args {
       n := len(s)
       client.writeCommand("$" + strconv.Itoa(n))
       client.writeCommand(s)
       println(n, s)
    }
    response := client.handleResponse()
    //fmt.Printf("%v", response)
    return response

}

// 写命令
func (client *RedisClient) writeCommand(s string) {
    client.conn.Write([]byte(s + "\r\n"))

}

// 解析返回结果
func (client *RedisClient) handleResponse() interface{} {

    //先读取第一个字符
    r, _, _ := client.reader.ReadRune()
    flag := string(r)
    fmt.Println("第一个操作数:" + flag)
    switch flag {
    case "+":
       //一行字符串
       return client.ReadLine()
    case "-":
       //异常
       return client.ReadLine()
    case ":":
       //数字
       line := client.ReadLine()
       res, _ := strconv.Atoi(line)
       return res
    case "$":
       // 多行字符串
       //readRune, _, _ := client.reader.ReadRune()
       //去掉换行
       readRune := client.ReadLine()
       length := string(readRune)
       if length == "-1" {
          return nil
       } else if length == "0" {
          return ""
       }
       lll, _ := strconv.Atoi(length)
       //+2是跳过\r\n
       bytes := make([]byte, lll+2)
       n, _ := client.reader.Read(bytes)
       return string(bytes[:n])
    case "*":
       //多行字符串 递归获取
       return client.readBulkString()
    default:
       return errors.New("错误")
    }
}

// 读一行
func (client *RedisClient) ReadLine() string {
    bytes, _, _ := client.reader.ReadLine()
    return string(bytes)
}

// 读到末尾 估计有点问题
func (client *RedisClient) ReadToEnd() string {

    var size = 1024
    bytes := make([]byte, size)
    var temp = ""
    for {
       n, err := client.reader.Read(bytes)
       temp += string(bytes[:n])
       //n, err := client.conn.Read(bytes)
       if err == io.EOF || n == 0 || n < size {
          break
       }

    }

    return temp
}
func (client *RedisClient) readBulkString() interface{} {

    counts, _ := strconv.Atoi(client.ReadLine())
    if counts <= 0 {
       return nil
    }
    //var resList = list.List{}
    var lists []interface{}
    for i := 0; i < counts; i++ {
       res := client.handleResponse()
       lists = append(lists, res)
       //fmt.Println("多行结果:" + fmt.Sprintf("%v", res))
    }

    return lists

}

与Redis网络模型相似的内容:

Redis网络模型

theme: condensed-night-purple highlight: androidstudio 主从复制原理 建立连接 从节点在配置了 replicaof 配置了主节点的ip和port 从库执行replicaof 并发送psync命令 同步数据到从库 主库bgsave生成RDB文件,并

通过redis学网络(2)-redis网络模型

> 本系列主要是为了对redis的网络模型和集群原理进行学习,我会用golang实现一个reactor网络模型,并实现对redis协议的解析。 系列源码已经上传github ```go https://github.com/HobbyBear/tinyredis/tree/chapter2 ```

通过redis学网络(1)-用go基于epoll实现最简单网络通信框架

![image.png](https://img2023.cnblogs.com/blog/1382767/202306/1382767-20230607105418219-574417823.png) > 本系列主要是为了对redis的网络模型进行学习,我会用golang实现一个reactor网络

[转帖]Redis客户端Jedis、Lettuce、Redisson

https://www.jianshu.com/p/90a9e2eccd73 在SpringBoot2.x之后,原来使用的jedis被替换为了lettuce Jedis:采用的直连,BIO网络模型 Jedis有一个问题:多个线程使用一个连接的时候线程不安全。 解决思路是: 使用连接池,为每个请求创建

(三)Redis 线程与IO模型

1、Redis 单线程 通常说 Redis 是单线程,主要是指 Redis 的网络 IO 和键值对读写是由一个线程来完成的,其他功能,比如持久化、异步删除、集群数据同步等,是由额外的线程执行的,所以严格来说,Redis 并不是单线程。 多线程开发会不可避免的带来并发控制和资源开销的问题,如果没有良好

[转帖]高性能IO模型:为什么单线程Redis能那么快?

https://zhuanlan.zhihu.com/p/596170085 你好,我是蒋德钧。 今天,我们来探讨一个很多人都很关心的问题:“为什么单线程的Redis能那么快?” 首先,我要和你厘清一个事实,我们通常说,Redis是单线程,主要是指Redis的网络IO和键值对读写是由一个线程来完成的

Redis单线程

Redis是基于Reactor模式开发的网络事件处理器,这个处理器是单线程的,所 以redis是单线程的。 为什么它是单线程还那么快呢? 主要有以下几个原因: 一、纯内存操作 由于Redis是纯内存操作,相比于磁盘来说,内存就快得多,这个是Redis快的主要 原因。 二、多路复用I/O机制(NIO)

腾讯音乐:说说Redis脑裂问题?

Redis 脑裂问题是指,在 Redis 哨兵模式或集群模式中,由于网络原因,导致主节点(Master)与哨兵(Sentinel)和从节点(Slave)的通讯中断,此时哨兵就会误以为主节点已宕机,就会在从节点中选举出一个新的主节点,此时 Redis 的集群中就出现了两个主节点的问题,就是 Redis

StackExchange.Redis跑起来,为什么这么溜?

StackExchange.Redis 是一个高性能的 Redis 客户端库,主要用于 .NET 环境下与 Redis 服务器进行通信,大名鼎鼎的stackoverflow 网站就使用它。它使用异步编程模型,能够高效处理大量请求。支持 Redis 的绝大部分功能,包括发布/订阅、事务、Lua 脚本等...

即时通讯系统为什么选择GaussDB(for Redis)?

摘要:如果你需要一款稳定可靠的高性能企业级KV数据库,不妨试试GaussDB(for Redis)。 每当网络上爆出热点新闻,混迹于各个社交媒体的小伙伴们全都开启了讨论模式。一条消息的产生是如何在群聊中传递的呢?让我们一起来探索即时通讯系统(IM)的原理。 IM系统架构的原理 当你在群聊“相亲相爱一