专栏名称: 石杉的架构笔记
专注原创、用心雕琢!十余年BAT一线大厂架构经验倾囊相授
目录
相关文章推荐
开发者全社区  ·  Top2本硕:一把好牌打的稀烂 ·  昨天  
开发者全社区  ·  当初和你结婚,对你的定位是P7 ·  昨天  
开发者全社区  ·  山东相亲食物链:码农沦为底层 ·  昨天  
开发者全社区  ·  女孩住酒店注意安全 ·  3 天前  
51好读  ›  专栏  ›  石杉的架构笔记

Redis:我的一条命令是如何执行的?

石杉的架构笔记  · 公众号  ·  · 2019-12-22 10:00

正文

公众号后台回复“ 学习 ”,免费获取精品学习资料

扫描下方海报 试听


本文来自程序员历小冰的投稿


今天我们来了解一下 Redis 命令执行的过程。

在之前的文章中 《当 Redis 发生高延迟时,到底发生了什么》 我们曾简单的描述了一条命令的执行过程,本篇文章展示深入说明一下,加深读者对 Redis 的了解。

如下图所示,一条命令执行完成并且返回数据一共涉及三部分

第一步是建立连接阶段,响应了socket的建立,并且创建了client对象;

第二步是处理阶段,从socket读取数据到输入缓冲区,然后解析并获得命令,执行命令并将返回值存储到输出缓冲区中;

第三步是数据返回阶段,将返回值从输出缓冲区写到socket中,返回给客户端,最后关闭client。

这三个阶段之间是通过事件机制串联了,在 Redis 启动阶段首先要注册socket连接建立事件处理器:

  • 当客户端发来建立socket的连接的请求时,对应的处理器方法会被执行,建立连接阶段的相关处理就会进行,然后注册socket读取事件处理器

  • 当客户端发来命令时,读取事件处理器方法会被执行,对应处理阶段的相关逻辑都会被执行,然后注册socket写事件处理器

  • 当写事件处理器被执行时,就是将返回值写回到socket中。

接下来,我们分别来看一下各个步骤的具体原理和代码实现。

启动时监听socket

Redis 服务器启动时,会调用 initServer 方法,首先会建立 Redis 自己的事件机制 eventLoop,然后在其上注册周期时间事件处理器,最后在所监听的 socket 上创建文件事件处理器,监听 socket 建立连接的事件,其处理函数为 acceptTcpHandler。

  1. void initServer(void) { // server.c

  2. ....

  3. /**

  4. * 创建eventLoop

  5. */

  6. server .el = aeCreateEventLoop(server.maxclients+CONFIG_FDSET_INCR);

  7. /* Open the TCP listening socket for the user commands. */


  8. if (server.port != 0 &&

  9. listenToPort(server.port,server.ipfd,&server.ipfd_count) == C_ERR)

  10. exit(1);


  11. /**

  12. * 注册周期时间事件,处理后台操作,比如说客户端操作、过期键等

  13. */

  14. if (aeCreateTimeEvent (server.el, 1, serverCron, NULL, NULL) == AE_ERR) {

  15. serverPanic("Can't create event loop timers.");

  16. exit(1);

  17. }

  18. /**

  19. * 为所有监听的socket创建文件事件,监听可读事件;事件处理函数为acceptTcpHandler

  20. *

  21. */

  22. for (j = 0; j < server.ipfd_count; j++) {

  23. if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE,

  24. acceptTcpHandler,NULL) == AE_ERR)

  25. {

  26. serverPanic(

  27. "Unrecoverable error creating server.ipfd file event.");

  28. }

  29. }

  30. ....

  31. }

《Redis 事件机制详解》 一文中,我们曾详细介绍过 Redis 的事件机制,可以说Redis 命令执行过程中都是由事件机制协调管理的,也就是 initServer 方法中生成的 aeEventLoop。

当socket发生对应的事件时,aeEventLoop 对调用已经注册的对应的事件处理器。

建立连接和Client

当客户端向 Redis 建立 socket时,aeEventLoop 会调用 acceptTcpHandler 处理函数,服务器会为每个链接创建一个 Client 对象,并创建相应文件事件来监听socket的可读事件,并指定事件处理函数。

acceptTcpHandler 函数会首先调用 anetTcpAccept 方法,它底层会调用 socket 的 accept 方法,也就是接受客户端来的建立连接请求,然后调用 acceptCommonHandler 方法,继续后续的逻辑处理。

  1. // 当客户端建立链接时进行的eventloop处理函数 networking.c

  2. void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {

  3. ....

  4. // 层层调用,最后在anet.c 中 anetGenericAccept 方法中调用 socket 的 accept 方法

  5. cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);

  6. if (cfd == ANET_ERR) {

  7. if (errno != EWOULDBLOCK)

  8. serverLog(LL_WARNING,

  9. "Accepting client connection: %s", server.neterr);

  10. return;

  11. }

  12. serverLog(LL_VERBOSE, "Accepted %s:%d", cip, cport);

  13. /**

  14. * 进行socket 建立连接后的处理

  15. */

  16. acceptCommonHandler(cfd,0,cip);

  17. }

acceptCommonHandler 则首先调用 createClient 创建 client,接着判断当前 client 的数量是否超出了配置的 maxclients,如果超过,则给客户端发送错误信息,并且释放 client

  1. static void acceptCommonHandler(int fd, int flags, char *ip) { //networking.c

  2. client *c;

  3. // 创建redisClient

  4. c = createClient(fd)

  5. // 当 maxClient 属性被设置,并且client数量已经超出时,给client发送error,然后释放连接

  6. if (listLength(server.clients) > server.maxclients) {

  7. char *err = "-ERR max number of clients reached\r\n";

  8. if (write (c->fd,err,strlen(err)) == -1) {

  9. }

  10. server.stat_rejected_conn++;

  11. freeClient(c);

  12. return;

  13. }

  14. .... // 处理为设置密码时默认保护状态的客户端连接

  15. // 统计连接数

  16. server.stat_numconnections++;

  17. c->flags |= flags;

  18. }

createClient 方法用于创建 client,它代表着连接到 Redis 客户端,每个客户端都有各自的输入缓冲区和输出缓冲区,输入缓冲区存储客户端通过 socket 发送过来的数据,输出缓冲区则存储着 Redis 对客户端的响应数据。client一共有三种类型,不同类型的对应缓冲区的大小都不同。

  • 普通客户端是除了复制和订阅的客户端之外的所有连接

  • 从客户端用于主从复制,主节点会为每个从节点单独建立一条连接用于命令复制

  • 订阅客户端用于发布订阅功能

createClient 方法除了创建 client 结构体并设置其属性值外,还会对 socket进行配置并注册读事件处理器

设置 socket 为 非阻塞 socket、设置 NO DELAY 和 SO KEEPALIVE标志位来关闭 Nagle 算法并且启动 socket 存活检查机制。

设置读事件处理器,当客户端通过 socket 发送来数据后,Redis 会调用 readQueryFromClient 方法。

  1. client *createClient(int fd) {

  2. client *c = zmalloc(sizeof (client));

  3. // fd 为 -1,表示其他特殊情况创建的client,redis在进行比如lua脚本执行之类的情况下也会创建client

  4. if (fd != -1) {

  5. // 配置socket为非阻塞、NO_DELAY不开启Nagle算法和SO_KEEPALIVE

  6. anetNonBlock(NULL,fd);

  7. anetEnableTcpNoDelay(NULL,fd);

  8. if (server.tcpkeepalive)

  9. anetKeepAlive(NULL,fd,server.tcpkeepalive);

  10. /**

  11. * 向 eventLoop 中注册了 readQueryFromClient。

  12. * readQueryFromClient 的作用就是从client中读取客户端的查询缓冲区内容。

  13. * 绑定读事件到事件 loop (开始接收命令请求)

  14. */

  15. if (aeCreateFileEvent(server.el,fd,AE_READABLE,

  16. readQueryFromClient, c) == AE_ERR)

  17. {

  18. close(fd);

  19. zfree (c);

  20. return NULL;

  21. }

  22. }

  23. // 默认选择数据库

  24. selectDb(c,0);

  25. uint64_t client_id;

  26. atomicGetIncr(server.next_client_id,client_id,1);

  27. c->id = client_id;

  28. c->fd = fd;

  29. .... // 设置client的属性

  30. return c;

  31. }

client 的属性中有很多属性,比如后边会看到的输入缓冲区 querybuf 和输出缓冲区 buf,这里因为代码过长做了省略,感兴趣的同学可以自行阅读源码。

读取socket数据到输入缓冲区

readQueryFromClient 方法会调用 read 方法从 socket 中读取数据到输入缓冲区中,然后判断其大小是否大于系统设置的client max querybuf_len,如果大于,则向 Redis返回错误信息,并关闭 client。

将数据读取到输入缓冲区后,readQueryFromClient 方法会根据 client 的类型来做不同的处理

如果是普通类型,则直接调用 processInputBuffer 来处理;如果是主从客户端,还需要将命令同步到自己的从服务器中。

也就是说,Redis实例将主实例传来的命令执行后,继续将命令同步给自己的从实例。

  1. // 处理从client中读取客户端的输入缓冲区内容。

  2. void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {

  3. client *c = (client*) privdata;

  4. ....

  5. if (c->querybuf_peak < qblen) c->querybuf_peak = qblen;

  6. c->querybuf = sdsMakeRoomFor(c->querybuf, readlen);

  7. // 从 fd 对应的socket中读取到 client 中的 querybuf 输入缓冲区

  8. nread = read(fd, c->querybuf+qblen, readlen);

  9. if (nread == -1) {

  10. .... // 出错释放 client

  11. } else if (nread == 0) {

  12. // 客户端主动关闭 connection

  13. serverLog(LL_VERBOSE, "Client closed connection");

  14. freeClient(c);

  15. return;

  16. } else if (c->flags & CLIENT_MASTER) {

  17. /*

  18. * 当这个client代表主从的master节点时,将query buffer和 pending_querybuf结合

  19. * 用于主从复制中的命令传播????

  20. */

  21. c->pending_querybuf = sdscatlen(c->pending_querybuf,

  22. c->querybuf+qblen,nread);

  23. }

  24. // 增加已经读取的字节数

  25. sdsIncrLen(c->querybuf,nread);

  26. c ->lastinteraction = server.unixtime;

  27. if (c->flags & CLIENT_MASTER) c->read_reploff += nread;

  28. server.stat_net_input_bytes += nread;

  29. // 如果大于系统配置的最大客户端缓存区大小,也就是配置文件中的client-query-buffer-limit

  30. if (sdslen(c->querybuf) > server.client_max_querybuf_len) {

  31. sds ci = catClientInfoString(sdsempty(),c), bytes = sdsempty();

  32. // 返回错误信息,并且关闭client

  33. bytes = sdscatrepr(bytes,c->querybuf,64);

  34. serverLog(LL_WARNING,"Closing client that reached max query buffer length: %s (qbuf initial bytes: %s)", ci, bytes);

  35. sdsfree(ci);

  36. sdsfree(bytes);

  37. freeClient(c);

  38. return;

  39. }



  40. if (!(c->flags & CLIENT_MASTER)) {

  41. // processInputBuffer 处理输入缓冲区

  42. processInputBuffer (c);

  43. } else {

  44. // 如果client是master的连接

  45. size_t prev_offset = c->reploff;

  46. processInputBuffer(c);

  47. // 判断是否同步偏移量发生变化,则通知到后续的slave

  48. size_t applied = c->reploff - prev_offset;


  49. if (applied) {

  50. replicationFeedSlavesFromMasterStream(server.slaves,

  51. c-> pending_querybuf, applied);

  52. sdsrange(c->pending_querybuf,applied,-1);

  53. }

  54. }

  55. }

解析获取命令

processInputBuffer 主要是将输入缓冲区中的数据解析成对应的命令,根据命令类型是 PROTO REQ MULTIBULK 还是 PROTO REQ INLINE,来分别调用 processInlineBuffer 和 processMultibulkBuffer 方法来解析命令。

然后调用 processCommand 方法来执行命令。执行成功后,如果是主从客户端,还需要更新同步偏移量 reploff 属性,然后重置 client,让client可以接收一条命令。

  1. void processInputBuffer(client *c) { // networking.c

  2. server.current_client = c;

  3. /* 当缓冲区中还有数据时就一直处理 */

  4. while(sdslen(c->querybuf)) {

  5. .... // 处理 client 的各种状态

  6. /* 判断命令请求类型 telnet发送的命令和redis-cli发送的命令请求格式不同 */

  7. if (!c->reqtype) {

  8. if (c->querybuf[0] == '*') {

  9. c->reqtype = PROTO_REQ_MULTIBULK;

  10. } else {

  11. c->reqtype = PROTO_REQ_INLINE;

  12. }

  13. }

  14. /**

  15. * 从缓冲区解析命令

  16. */

  17. if (c->reqtype == PROTO_REQ_INLINE) {

  18. if (processInlineBuffer(c) != C_OK) break;

  19. } else if (c->reqtype == PROTO_REQ_MULTIBULK) {

  20. if (processMultibulkBuffer(c) != C_OK) break;

  21. } else {

  22. serverPanic("Unknown request type");

  23. }


  24. /* 参数个数为0时重置client,可以接受下一个命令 */

  25. if (c->argc == 0) {

  26. resetClient(c);

  27. } else {

  28. // 执行命令

  29. if (processCommand(c) == C_OK) {

  30. if (c->flags & CLIENT_MASTER && !(c->flags & CLIENT_MULTI)) {

  31. // 如果是master的client发来的命令,则 更新 reploff

  32. c->reploff = c->read_reploff - sdslen(c->querybuf);

  33. }


  34. // 如果不是阻塞状态,则重置client,可以接受下一个命令

  35. if (!(c->flags & CLIENT_BLOCKED) || c->btype != BLOCKED_MODULE)

  36. resetClient(c);

  37. }

  38. }

  39. }

  40. server.current_client = NULL;

  41. }

解析命令暂时不看,就是将 redis 命令文本信息,记录到client的argv/argc属性中

执行命令

processCommand 方法会处理很多逻辑,不过大致可以分为三个部分:首先是调用 lookupCommand 方法获得对应的 redisCommand;接着是检测当前 Redis 是否可以执行该命令;最后是调用 call 方法真正执行命令。

processCommand会做如下逻辑处理:

  • 1 如果命令名称为 quit,则直接返回,并且设置客户端标志位。

  • 2 根据 argv[0] 查找对应的 redisCommand,所有的命令都存储在命令字典 redisCommandTable 中,根据命令名称可以获取对应的命令。

  • 3 进行用户权限校验。

  • 4 如果是集群模式,处理集群重定向。当命令发送者是 master 或者 命令没有任何 key 的参数时可以不重定向。

  • 5 预防 maxmemory 情况,先尝试回收一下,如果不行,则返回异常。

  • 6 当此服务器是 master 时:aof 持久化失败时,或上一次 bgsave 执行错误,且配置 bgsave 参数和 stopwritesonbgsaveerr;禁止执行写命令。

  • 7 当此服务器时master时:如果配置了 replminslavestowrite,当slave数目小于时,禁止执行写命令。

  • 8 当时只读slave时,除了 master 的不接受其他写命令。

  • 9 当客户端正在订阅频道时,只会执行部分命令。

  • 10 服务器为slave,但是没有连接 master 时,只会执行带有 CMD_STALE 标志的命令,如 info 等

  • 11 正在加载数据库时,只会执行带有 CMD_LOADING 标志的命令,其余都会被拒绝。

  • 12 当服务器因为执行lua脚本阻塞时,只会执行部分命令,其余都会拒绝

  • 13 如果是事务命令,则开启事务,命令进入等待队列;否则直接执行命令。

  1. int processCommand(client *c) {

  2. // 1 处理 quit 命令

  3. if (! strcasecmp(c->argv[0]->ptr,"quit")) {

  4. addReply(c,shared.ok);

  5. c->flags |= CLIENT_CLOSE_AFTER_REPLY;

  6. return C_ERR;

  7. }


  8. /**

  9. * 根据 argv[0] 查找对应的 command

  10. * 2 命令字典查找指定命令;所有的命令都存储在命令字典中 struct redisCommand redisCommandTable[]={}

  11. */

  12. c->cmd = c->lastcmd = lookupCommand(c->argv[0]-> ptr);

  13. if (!c->cmd) {

  14. // 处理未知命令

  15. } else if ((c->cmd->arity > 0 && c->cmd->arity != c->argc) ||

  16. (c->argc < -c->cmd->arity)) {

  17. // 处理参数错误

  18. }

  19. // 3 检查用户验证

  20. if (server.requirepass && !c->authenticated && c->cmd->proc != authCommand)

  21. {

  22. flagTransaction(c);

  23. addReply(c,shared.noautherr);

  24. return C_OK;

  25. }


  26. /**

  27. * 4 如果是集群模式,处理集群重定向。当命令发送者是master或者 命令没有任何key的参数时可以不重定向

  28. */

  29. if (server.cluster_enabled &&

  30. !(c->flags & CLIENT_MASTER) &&

  31. !(c->flags & CLIENT_LUA &&

  32. server.lua_caller->flags & CLIENT_MASTER) &&

  33. !(c->cmd->getkeys_proc == NULL && c->cmd->firstkey == 0 &&

  34. c->cmd->proc != execCommand))

  35. {

  36. int hashslot;

  37. int error_code;

  38. // 查询可以执行的node信息

  39. clusterNode *n = getNodeByQuery(c,c->cmd,c->argv,c->argc,

  40. &hashslot,&error_code);

  41. if (n == NULL || n != server.cluster->myself) {

  42. if (c->cmd->proc == execCommand) {

  43. discardTransaction(c);

  44. } else {

  45. flagTransaction(c);

  46. }

  47. clusterRedirectClient(c,n,hashslot,error_code);

  48. return C_OK;

  49. }

  50. }


  51. // 5 处理maxmemory请求,先尝试回收一下,如果不行,则返回异常

  52. if (server.maxmemory) {

  53. int retval = freeMemoryIfNeeded();

  54. ....

  55. }


  56. /**

  57. * 6 当此服务器是master时:aof持久化失败时,或上一次bgsave执行错误,

  58. * 且配置bgsave参数和stop_writes_on_bgsave_err;禁止执行写命令

  59. */

  60. if (((server.stop_writes_on_bgsave_err &&

  61. server.saveparamslen > 0 &&

  62. server.lastbgsave_status == C_ERR) ||

  63. server.aof_last_write_status == C_ERR) &&

  64. server.masterhost == NULL &&

  65. (c->cmd->flags & CMD_WRITE ||

  66. c->cmd->proc == pingCommand)) { .... }


  67. /**

  68. * 7 当此服务器时master时:如果配置了repl_min_slaves_to_write,

  69. * 当slave数目小于时,禁止执行写命令

  70. */

  71. if (server.masterhost == NULL &&

  72. server.repl_min_slaves_to_write &&

  73. server. repl_min_slaves_max_lag &&

  74. c->cmd->flags & CMD_WRITE &&

  75. server.repl_good_slaves_count < server.repl_min_slaves_to_write) { .... }


  76. /**

  77. * 8 当时只读slave时,除了master的不接受其他写命令

  78. */

  79. if (server.masterhost && server.repl_slave_ro &&

  80. !(c->flags & CLIENT_MASTER) &&

  81. c->cmd->flags & CMD_WRITE) { .... }


  82. /**

  83. * 9 当客户端正在订阅频道时,只会执行以下命令

  84. */

  85. if (c->flags & CLIENT_PUBSUB &&

  86. c->cmd->proc != pingCommand &&

  87. c->cmd->proc != subscribeCommand &&

  88. c->cmd->proc != unsubscribeCommand &&

  89. c->cmd->proc != psubscribeCommand &&

  90. c-> cmd->proc != punsubscribeCommand) { .... }

  91. /**

  92. * 10 服务器为slave,但没有正确连接master时,只会执行带有CMD_STALE标志的命令,如info等

  93. */

  94. if (server.masterhost && server.repl_state != REPL_STATE_CONNECTED &&

  95. server.repl_serve_stale_data == 0 &&

  96. !(c->cmd->flags & CMD_STALE)) {...}

  97. /**

  98. * 11 正在加载数据库时,只会执行带有CMD_LOADING标志的命令,其余都会被拒绝

  99. */

  100. if (server.loading && !(c->cmd->flags & CMD_LOADING)) { .... }

  101. /**

  102. * 12 当服务器因为执行lua脚本阻塞时,只会执行以下几个命令,其余都会拒绝

  103. */

  104. if (server.lua_timedout &&

  105. c->cmd->proc != authCommand &&

  106. c->cmd->proc != replconfCommand &&

  107. !(c->cmd->proc == shutdownCommand &&

  108. c->argc == 2 &&

  109. tolower(((char*)c->argv[1]->ptr)[0]) == 'n'







请到「今天看啥」查看全文