readQueryFromClient
函数void readQueryFromClient(connection *conn) {
client *c = connGetPrivateData(conn);
int nread, big_arg = 0;
size_t qblen, readlen;
/* Check if we want to read from the client later when exiting from
* the event loop. This is the case if threaded I/O is enabled. */
// 这里是多线程模型走的地方,被文分析单线程模式,不走这里
if (postponeClientRead(c)) return;
......
/* There is more data in the client input buffer, continue parsing it
* and check if there is a full command to execute. */
// 走到这里,开始处理客户端发来的命令
if (processInputBuffer(c) == C_ERR)
c = NULL;
}
processInputBuffer
int processInputBuffer(client *c) {
/* Keep processing while there is something in the input buffer */
while(c->qb_pos < sdslen(c->querybuf)) {
if (c->flags & CLIENT_BLOCKED) break;
if (c->flags & CLIENT_PENDING_COMMAND) break;
if (isInsideYieldingLongCommand() && c->flags & CLIENT_MASTER) break;
if (c->flags & (CLIENT_CLOSE_AFTER_REPLY|CLIENT_CLOSE_ASAP)) break;
......
/* Multibulk processing could see a <= 0 length. */
if (c->argc == 0) {
resetClient(c);
} else {
if (io_threads_op != IO_THREADS_OP_IDLE) {
serverAssert(io_threads_op == IO_THREADS_OP_READ);
c->flags |= CLIENT_PENDING_COMMAND;
break;
}
/* We are finally ready to execute the command. */
// 最终在这里执行命令
if (processCommandAndResetClient(c) == C_ERR) {
return C_ERR;
}
}
}
......
return C_OK;
}
processCommandAndResetClient
int processCommandAndResetClient(client *c) {
int deadclient = 0;
client *old_client = server.current_client;
server.current_client = c;
// 这里处理命令
if (processCommand(c) == C_OK) {
commandProcessed(c);
/* Update the client's memory to include output buffer growth following the
* processed command. */
updateClientMemUsageAndBucket(c);
}
if (server.current_client == NULL) deadclient = 1;
/*
* Restore the old client, this is needed because when a script
* times out, we will get into this code from processEventsWhileBlocked.
* Which will cause to set the server.current_client. If not restored
* we will return 1 to our caller which will falsely indicate the client
* is dead and will stop reading from its buffer.
*/
server.current_client = old_client;
/* performEvictions may flush slave output buffers. This may
* result in a slave, that may be the active client, to be
* freed. */
return deadclient ? C_ERR : C_OK;
}
processCommand
int processCommand(client *c) {
......
/* Now lookup the command and check ASAP about trivial error conditions
* such as wrong arity, bad command name and so forth. */
// 这里搜索命令,sentinel的命令也是在这里处理
// 不像老版本,6.0以前,使用的是命令覆盖的方式,改用搜索
c->cmd = c->lastcmd = c->realcmd = lookupCommand(c->argv,c->argc);
......
/* Exec the command */
if (c->flags & CLIENT_MULTI &&
c->cmd->proc != execCommand &&
c->cmd->proc != discardCommand &&
c->cmd->proc != multiCommand &&
c->cmd->proc != watchCommand &&
c->cmd->proc != quitCommand &&
c->cmd->proc != resetCommand)
{
queueMultiCommand(c, cmd_flags);
addReply(c,shared.queued);
} else {
//最终普通命令调用 call函数执行
call(c,CMD_CALL_FULL);
c->woff = server.master_repl_offset;
if (listLength(server.ready_keys) && !isInsideYieldingLongCommand())
handleClientsBlockedOnKeys();
}
}
void call(client *c, int flags) {
......
server.in_nested_call++;
c->cmd->proc(c);
server.in_nested_call--;
......
}
最终调用cmd 对应的proc
lookupCommand
时就会找到对应的command 对象,由redisCommandTable
可知,对应方法为getCommand
t_string.c
void getCommand(client *c) {
getGenericCommand(c);
}
int getGenericCommand(client *c) {
robj *o;
// 搜索db中数据
if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.null[c->resp])) == NULL)
return C_OK;
if (checkType(c,o,OBJ_STRING)) {
return C_ERR;
}
// 将数据写入到输出缓存中,待eventloop 发现可写后发送
addReplyBulk(c,o);
return C_OK;
}