redis7源码分析:redis 启动流程

redis7,源码,分析,redis,启动,流程 · 浏览次数 : 4

小编点评

Sure, here's a summary of the content you provided: **Function Description:** - `set_read_handler` sets a callback function to be called when the connection is readable. - `get_last_error` returns the last error encountered when communicating with the connection. - `blocking_connect` establishes a blocking connection with a server. - `sync_write` and `sync_read` facilitate full-duplex communication with the server, writing and reading a specified amount of data. - `sync_readline` allows reading a specific amount of data from the server without blocking. - `get_type` returns the connection type, which in this case is `CT_Socket`. **Key Points:** - `connCreateSocket` creates a new connection object and binds the fd to a socket. - `acceptCommonHandler` handles new client connections by creating a `client` object and binding read/write handlers. - `connSetReadHandler` assigns a callback function to the `read_handler` member variable. - Blocking connect is established using `connSocketBlockingConnect`. - `sync_write` and `sync_read` allow for full-duplex communication with the server by reading and writing data in a single operation. - The `get_type` function is used to determine the connection type. **Overall, the code demonstrates how to set up and use various functionalities for handling connections in a Redis server.**

正文

1. redis 由 server.c 的main函数启动

int main(int argc, char **argv) { 
...
// 上面的部分为读取配置和启动命令参数解析,看到这一行下面为启动流程
    serverLog(LL_WARNING, "oO0OoO0OoO0Oo Redis is starting oO0OoO0OoO0Oo");
...
	// 这里对服务进行初始化操作
    initServer();
...
	aeMain(server.el);
}

main 函数中最重要的几步

1.1 第一步就是执行 initServer

void initServer(void) {
...
    // 创建eventLoop, 这里是redis非常关键的一个结构体,在这里监听所有的事件
    server.el = aeCreateEventLoop(server.maxclients+CONFIG_FDSET_INCR);
...
    /* 这里监听TCP端口,拿到监听的文件描述符ipfd, 监听客户端发过来的命令 */
    if (server.port != 0 &&
        listenToPort(server.port,&server.ipfd) == C_ERR) {
        /* Note: the following log text is matched by the test suite. */
        serverLog(LL_WARNING, "Failed listening on port %u (TCP), aborting.", server.port);
        exit(1);
    }
...
    // 创建定时执行的后台操作事件,定时调用serverCron 函数,
    if (aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL) == AE_ERR) {
        serverPanic("Can't create event loop timers.");
        exit(1);
    }
...
    // 绑定ipfd的accept事件的执行函数 acceptTcpHandler
    // 实际是监听ipfd的可读事件,一旦可读,证明有连接进来了,立刻ceptTcpHandler
    if (createSocketAcceptHandler(&server.ipfd, acceptTcpHandler) != C_OK) {
        serverPanic("Unrecoverable error creating TCP socket accept handler.");
    }
...
    //注册eventLoop循环的执行前和执行后操作函数,类似于spring的环绕增强
    aeSetBeforeSleepProc(server.el,beforeSleep);
    aeSetAfterSleepProc(server.el,afterSleep);

}
initServer 中最重要的两步
1. 创建了eventLoop,这是redis的核心
2. 监听了TCP端口,为后续网络操作的入口(select epoll等)
3. 绑定了TCP事件的handler函数

1.2 初始化完server后,执行aeMain, ae.c是redis的核心,是一个事件驱动开发的库

void aeMain(aeEventLoop *eventLoop) {
    eventLoop->stop = 0;
    while (!eventLoop->stop) {
        aeProcessEvents(eventLoop, AE_ALL_EVENTS|
                                   AE_CALL_BEFORE_SLEEP|
                                   AE_CALL_AFTER_SLEEP);
    }
}

这里相当于是一个死循环,不停得监听事件并调用绑定的handler函数

int aeProcessEvents(aeEventLoop *eventLoop, int flags) {
...
    if (eventLoop->maxfd != -1 ||
        ((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) {
        // 执行调用前函数
        if (eventLoop->beforesleep != NULL && flags & AE_CALL_BEFORE_SLEEP)
            eventLoop->beforesleep(eventLoop);

        /* Call the multiplexing API, will return only on timeout or when
         * some event fires. */
        // 这里根据不同的操作系统执行不同的系统调用,总体就是找到可读或可写的fd
        numevents = aeApiPoll(eventLoop, tvp);

        /* 执行调用后函数. */
        if (eventLoop->aftersleep != NULL && flags & AE_CALL_AFTER_SLEEP)
            eventLoop->aftersleep(eventLoop);

        for (j = 0; j < numevents; j++) {
            int fd = eventLoop->fired[j].fd;
            aeFileEvent *fe = &eventLoop->events[fd];
            int mask = eventLoop->fired[j].mask;
            int fired = 0; /* Number of events fired for current fd. */
            ...

            // aeApiPoll 中已标记好事件的类型,这里通过mask来找到可读事件
            if (!invert && fe->mask & mask & AE_READABLE) {
                // 执行绑定的可读事件的 handler函数
                fe->rfileProc(eventLoop,fd,fe->clientData,mask);
                fired++;
                fe = &eventLoop->events[fd]; /* Refresh in case of resize. */
            }

            // aeApiPoll 中已标记好事件的类型,这里通过mask来找到可写事件
            if (fe->mask & mask & AE_WRITABLE) {
                if (!fired || fe->wfileProc != fe->rfileProc) {
                    // 执行绑定的可写事件的 handler函数
                    fe->wfileProc(eventLoop,fd,fe->clientData,mask);
                    fired++;
                }
            }
...
            processed++;
        }
    }

}

1.2 分析ae_epoll.c,这里通过系统调用epoll,找到可读可写的fd

static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
    aeApiState *state = eventLoop->apidata;
    int retval, numevents = 0;

    // epfd = epoll fd, 需要监听的fd都会通过epoll_ctl 提前挂到 epfd上
    // 这里等待查看可用的fd
    retval = epoll_wait(state->epfd,state->events,eventLoop->setsize,
            tvp ? (tvp->tv_sec*1000 + (tvp->tv_usec + 999)/1000) : -1);
    if (retval > 0) {
        int j;

        numevents = retval;
        // 遍历可用的fd,并通过el的filed数组进行标记,为后续执行做准备,
        // 这里仅是标记
        for (j = 0; j < numevents; j++) {
            int mask = 0;
            struct epoll_event *e = state->events+j;

            if (e->events & EPOLLIN) mask |= AE_READABLE;
            if (e->events & EPOLLOUT) mask |= AE_WRITABLE;
            if (e->events & EPOLLERR) mask |= AE_WRITABLE|AE_READABLE;
            if (e->events & EPOLLHUP) mask |= AE_WRITABLE|AE_READABLE;
            eventLoop->fired[j].fd = e->data.fd;
            eventLoop->fired[j].mask = mask;
        }
    } else if (retval == -1 && errno != EINTR) {
        panic("aeApiPoll: epoll_wait, %s", strerror(errno));
    }

    return numevents;
}

1.3 由于之前绑定了acceptTcpHandler, 当ipfd 可读时会调用acceptTcpHandler

acceptTcpHandler 在 networking.c中

void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
    int cport, cfd, max = MAX_ACCEPTS_PER_CALL;
    char cip[NET_IP_STR_LEN];
    UNUSED(el);
    UNUSED(mask);
    UNUSED(privdata);

    while(max--) {
        // 获取accept client对应的fd,这里由于通过epoll,避免了accept的长期阻塞
        cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);
        if (cfd == ANET_ERR) {
            if (errno != EWOULDBLOCK)
                serverLog(LL_WARNING,
                    "Accepting client connection: %s", server.neterr);
            return;
        }
        serverLog(LL_VERBOSE,"Accepted %s:%d", cip, cport);
		// 1. 根据client fd 创建了socket
		// 2. 执行了acceptCommonHandler
        acceptCommonHandler(connCreateAcceptedSocket(cfd),0,cip);
    }
}

1.3.1 创建socket

connection.c

typedef struct ConnectionType {
    void (*ae_handler)(struct aeEventLoop *el, int fd, void *clientData, int mask);
    int (*connect)(struct connection *conn, const char *addr, int port, const char *source_addr, ConnectionCallbackFunc connect_handler);
    int (*write)(struct connection *conn, const void *data, size_t data_len);
    int (*writev)(struct connection *conn, const struct iovec *iov, int iovcnt);
    int (*read)(struct connection *conn, void *buf, size_t buf_len);
    void (*close)(struct connection *conn);
    int (*accept)(struct connection *conn, ConnectionCallbackFunc accept_handler);
    int (*set_write_handler)(struct connection *conn, ConnectionCallbackFunc handler, int barrier);
    int (*set_read_handler)(struct connection *conn, ConnectionCallbackFunc handler);
    const char *(*get_last_error)(struct connection *conn);
    int (*blocking_connect)(struct connection *conn, const char *addr, int port, long long timeout);
    ssize_t (*sync_write)(struct connection *conn, char *ptr, ssize_t size, long long timeout);
    ssize_t (*sync_read)(struct connection *conn, char *ptr, ssize_t size, long long timeout);
    ssize_t (*sync_readline)(struct connection *conn, char *ptr, ssize_t size, long long timeout);
    int (*get_type)(struct connection *conn);
} ConnectionType;

ConnectionType CT_Socket = {
    .ae_handler = connSocketEventHandler,
    .close = connSocketClose,
    .write = connSocketWrite,
    .writev = connSocketWritev,
    .read = connSocketRead,
    .accept = connSocketAccept,
    .connect = connSocketConnect,
    .set_write_handler = connSocketSetWriteHandler,
    .set_read_handler = connSocketSetReadHandler,
    .get_last_error = connSocketGetLastError,
    .blocking_connect = connSocketBlockingConnect,
    .sync_write = connSocketSyncWrite,
    .sync_read = connSocketSyncRead,
    .sync_readline = connSocketSyncReadLine,
    .get_type = connSocketGetType
};

connection *connCreateSocket() {
    connection *conn = zcalloc(sizeof(connection));
    conn->type = &CT_Socket;
    conn->fd = -1;

    return conn;
}

// 该函数创建了一个connection 对象,绑定了fd
connection *connCreateAcceptedSocket(int fd) {
    connection *conn = connCreateSocket();
    conn->fd = fd;
    conn->state = CONN_STATE_ACCEPTING;
    return conn;
}

1.3.2 执行 acceptCommonHandler

static void acceptCommonHandler(connection *conn, int flags, char *ip) {
...
    // 最关键的一步,创建client
    if ((c = createClient(conn)) == NULL) {
        serverLog(LL_WARNING,
            "Error registering fd event for the new client: %s (conn: %s)",
            connGetLastError(conn),
            connGetInfo(conn, conninfo, sizeof(conninfo)));
        connClose(conn); /* May be already closed, just ignore errors */
        return;
    }

...
}

1.3.3 创建出client,与客户端一一对应

client *createClient(connection *conn) {
    client *c = zmalloc(sizeof(client));

    /* passing NULL as conn it is possible to create a non connected client.
     * This is useful since all the commands needs to be executed
     * in the context of a client. When commands are executed in other
     * contexts (for instance a Lua script) we need a non connected client. */
    if (conn) {
        connEnableTcpNoDelay(conn);
        if (server.tcpkeepalive)
            connKeepAlive(conn,server.tcpkeepalive);
        // 关键一步,在这里绑定读取事件 readQueryFromClient
        connSetReadHandler(conn, readQueryFromClient);
        // 将connection 与 client 绑定
        connSetPrivateData(conn, c);
    }
    ...

}

1.3.4 读事件绑定,这里是client 发命令到 server 读取的入口

connection.h

/* Register a read handler, to be called when the connection is readable.
 * If NULL, the existing handler is removed.
 */
static inline int connSetReadHandler(connection *conn, ConnectionCallbackFunc func) {
    return conn->type->set_read_handler(conn, func);
}
由connection 构造时的逻辑可知, 这里的type 是 CT_Socket,那么 set_read_handler 是 connSocketSetReadHandler
static int connSocketSetReadHandler(connection *conn, ConnectionCallbackFunc func) {
    if (func == conn->read_handler) return C_OK;

    conn->read_handler = func;
    if (!conn->read_handler)
        aeDeleteFileEvent(server.el,conn->fd,AE_READABLE);
    else
        // 这里注册了一个可读的事件,用来回调read_handler,即 readQueryFromClient
        // 这里的fd 是accept 得到的 client fd
        if (aeCreateFileEvent(server.el,conn->fd,
                    AE_READABLE,conn->type->ae_handler,conn) == AE_ERR) return C_ERR;
    return C_OK;
}
到这里注册客户端发送来命令的可读事件就注册完成了,启动逻辑就完成了

到这里还是在一条线程中执行的所有指令,那么 redis 是否就是一个线程搞完所有事情呢

其实不是,redis 有两种模式,一种是单线程模式,一种是多线程模式

将在后续的文章中继续分析

与 redis7源码分析:redis 启动流程相似的内容:

redis7源码分析:redis 启动流程

1. redis 由 server.c 的main函数启动 int main(int argc, char **argv) { ... // 上面的部分为读取配置和启动命令参数解析,看到这一行下面为启动流程 serverLog(LL_WARNING, "oO0OoO0OoO0Oo Redis is

redis7源码分析:redis 单线程模型解析,一条get命令执行流程

有了下文的梳理后 redis 启动流程 再来解析redis 在单线程模式下解析并处理客户端发来的命令 1. 当 client fd 可读时,会回调readQueryFromClient函数 void readQueryFromClient(connection *conn) { client *c

redis7源码分析:redis 多线程模型解析

多线程模式中,在main函数中会执行InitServerLast void InitServerLast() { bioInit(); // 关键一步, 这里启动了多条线程,用于执行命令,redis起名为IO 线程 initThreadedIO(); set_jemalloc_bg_thread(s

redis 源码分析:Jedis 哨兵模式连接原理

1. 可以从单元测试开始入手 查看类JedisSentinelPool private static final String MASTER_NAME = "mymaster"; protected static final HostAndPort sentinel1 = HostAndPorts.

[转帖]Redis 4.0 自动内存碎片整理(Active Defrag)源码分析

阅读本文前建议先阅读此篇博客: Redis源码从哪里读起 Redis 4.0 版本增加了许多不错的新功能,其中自动内存碎片整理功能 activedefrag 肯定是非常诱人的一个,这让 Redis 集群回收内存碎片相比 Redis 3.0 更加优雅,便利。我们升级 Redis 4.0 后直接开启了a

Redisson 限流器源码分析

Redisson 限流器源码分析 对上篇文章网友评论给出问题进行解答:redis 的key 是否会过期,过期指的限流器 可以先阅读上篇文章: redis + AOP + 自定义注解实现接口限流 - 古渡蓝按 - 博客园 (cnblogs.com) 注解AOP 代码部分提取 // 调用Reids工具类

Redisson 限流器源码分析

Redisson 限流器源码分析 对上篇文章网友评论给出问题进行解答:redis 的key 是否会过期 可以先阅读上篇文章: redis + AOP + 自定义注解实现接口限流 - 古渡蓝按 - 博客园 (cnblogs.com) 注解AOP 代码部分提取 // 调用Reids工具类的rateLim

跳跃表数据结构与算法分析

目前市面上充斥着大量关于跳跃表结构与Redis的源码解析,但是经过长期观察后发现大都只是在停留在代码的表面,而没有系统性地介绍跳跃表的由来以及各种常量的由来。作为一种概率数据结构,理解各种常量的由来可以更好地进行变化并应用到高性能功能开发中。本文没有重复地以对现有优秀实现进行代码分析,而是通过对跳跃表进行了系统性地介绍与形式化分析,并给出了在特定场景下的跳跃表扩展方式,方便读者更好地理解跳跃表数据

记一次Redis Cluster Pipeline导致的死锁问题

本文介绍了一次排查Dubbo线程池耗尽问题的过程。通过查看Dubbo线程状态、分析Jedis连接池获取连接的源码、排查死锁条件等方面,最终确认是因为使用了cluster pipeline模式且没有设置超时时间导致死锁问题。

[转帖]Redis进阶实践之十三 Redis的Redis-trib.rb脚本文件使用详解

https://www.cnblogs.com/PatrickLiu/p/8484784.html 一、简介 事先说明一下,本篇文章不涉及对redis-trib.rb源代码的分析,只是从使用的角度来阐述一下,对第一次使用的人来说很重要。redis-trib.rb是redis官方推出的管理redis集