Redis 技术专题系列之网络架构和线程模型

它采用了 I/O 多路复用技术,利用单个线程处理所有客户端请求和网络 I/O 操作。同时,在 ae.c 文件中,还有以下关于多路复用模型的代码:

引言

Redis 是一个高性能、内存存储的键值数据库。它支持丰富的数据结构,如字符串、哈希、列表、集合和有序集合。Redis 的性能得益于其网络架构和线程模型。

本文将介绍 Redis 的网络架构和线程模型,希望能够帮助读者更好地理解 Redis 的性能优化和运维管理。

大家好,这里是互联网技术学堂,留下你的点赞、关注、分享,支持一下吧,谢谢。

Redis 技术专题系列之网络架构和线程模型

网络架构

Redis 的网络架构是基于单线程的事件循环模型。它采用了 I/O 多路复用技术,利用单个线程处理所有客户端请求和网络 I/O 操作。

网络编程离不开 Socket,网络 I/O 模型常用的无非是同步阻塞、同步非阻塞、异步阻塞、异步非阻塞,高性能网络服务器常见的线程模型也就是基于 EventLoop 模式的单线程模型。

Redis 技术专题系列之网络架构和线程模型

当客户端连接到 Redis 服务器时,服务器会为每个客户端分配一个文件描述符,并将其加入到事件循环机制中。当有数据可读或可写时,Redis 会触发相应的事件,单线程通过事件循环机制调度事件处理函数来处理客户端请求和网络 I/O 操作。

这种单线程的事件循环模型带来了以下优点:

  1. 简单:不需要线程切换和同步机制,减少了锁竞争和上下文切换的开销。
  2. 可扩展性:通过多个 Redis 实例和数据分片来实现横向扩展。
  3. 高可用性:通过主从复制和哨兵机制来实现高可用性。
Redis 技术专题系列之网络架构和线程模型

线程模型

Redis 采用的是单线程的事件循环模型,但它并不是单进程的。Redis 可以通过配置文件中的 daemonize 选项将自己变成守护进程,同时可以通过 fork 函数来创建子进程来处理持久化操作和复制操作。

子进程主要用于持久化和复制操作,Redis 采用了写时复制(Copy-On-Write,简称 COW)技术来优化子进程的性能。当子进程需要进行写操作时,Redis 会将需要修改的数据复制一份,修改完成后再将修改后的数据替换原来的数据。

这种写时复制技术带来了以下优点:

  1. 性能:减少了内存复制的开销,提高了写操作的性能。
  2. 数据:避免了在多个进程之间共享同一块内存带来的数据问题。

总体来说,Redis 的线程模型和网络架构都是非常不错的,可以帮助 Redis 实现高性能、高可用和可扩展的特性。因此,掌握 Redis 的网络架构和线程模型是 Redis 开发和运维的重要知识点,也是高频面试题的考点之一。

Redis 技术专题系列之网络架构和线程模型

源码层面分析网络架构

Redis 的网络架构主要包含以下两个部分:

1、事件驱动模型

Redis 使用 epoll 等事件驱动模型来处理客户端的连接、读写操作。具体实现代码在 ae.c 文件中。

首先,Redis 会创建一个事件循环机制,不断地监听和处理事件。在初始化 Redis 服务器时,通过 aeCreateEventLoop() 函数创建一个事件循环机制。

aeEventLoop *aeCreateEventLoop(int setsize) {
aeEventLoop *eventLoop;
int i;
if ((eventLoop = zmalloc(sizeof(*eventLoop))) == NULL) goto err;
eventLoop->events = zmalloc(sizeof(aeFileEvent)*setsize);
eventLoop->fired = zmalloc(sizeof(aeFiredEvent)*setsize);
if (eventLoop->events == NULL || eventLoop->fired == NULL) goto err;
eventLoop->setsize = setsize;
eventLoop->lastTime = time(NULL);
eventLoop->timeEventHead = NULL;
eventLoop->timeEventNextId = 0;
eventLoop->stop = 0;
eventLoop->maxfd = -1;
eventLoop->beforesleep = NULL;
if (aeApiCreate(eventLoop) == -1) goto err;
for (i = 0; i < setsize; i++)
eventLoop->events[i].mask = AE_NONE;
return eventLoop;
err:
if (eventLoop) {
zfree(eventLoop->events);
zfree(eventLoop->fired);
zfree(eventLoop);
}
return NULL;
}

然后,Redis 会将客户端的连接事件和读写事件加入到事件循环机制中。具体实现代码在 networking.c 文件中。

static void acceptCommonHandler(int fd, int flags, char *ip) {
int cport, cfd, max = server.maxclients;
char cip[NET_IP_STR_LEN];
listNode *ln;
client *c;
if ((cfd = anetTcpAccept(server.neterr, fd, cip, &cport)) == ANET_ERR) {
if (errno != EWOULDBLOCK) {
serverLog(LL_WARNING, "Accepting client connection: %s", server.neterr);
}
return;
}
anetNonBlock(NULL,cfd);
anetEnableTcpNoDelay(NULL,cfd);
if (server.tcpkeepalive)
anetKeepAlive(NULL, cfd, server.tcpkeepalive);
if (max && listLength(server.clients) >= max) {
serverLog(LL_VERBOSE,"WARNING: maximum number of clients reached");
close(cfd);
return;
}
ln = listAddNodeTail(server.clients,NULL);
c = zmalloc(sizeof(*c));
c->connfd = cfd;
c->firsttime = time(NULL);
c->lastinteraction = c->lastping = c->lastoutput = server.unixtime;
c->authenticated = 0;
c->fd = cfd;
c->name = NULL;
c->querybuf = sdsempty();
c->reqtype = 0;
c->argc = 0

在上面的代码中,anetTcpAccept() 函数用于监听客户端的连接事件,并返回连接客户端的文件描述符。如果有客户端连接,就会执行 listAddNodeTail() 函数将客户端添加到 server.clients 链表中,并为客户端分配一个 client 结构体对象。在 client 结构体对象中,保存了客户端的连接描述符、读写缓冲区、近一次交互时间等信息。

然后,Redis 会将客户端的连接描述符和读写事件加入到事件循环机制中,使得 Redis 能够及时地处理客户端的请求。具体实现代码在 ae.c 文件中。

int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
aeFileProc *proc, void *clientData)
{
if (fd >= eventLoop->setsize) {
errno = ERANGE;
return AE_ERR;
}
aeFileEvent *fe = &eventLoop->events[fd];
if (aeApiAddEvent(eventLoop, fd, mask) == -1)
return AE_ERR;
fe->mask |= mask;
if (mask & AE_READABLE) fe->rfileProc = proc;
if (mask & AE_WRITABLE) fe->wfileProc = proc;
fe->clientData = clientData;
if (fd > eventLoop->maxfd)
eventLoop->maxfd = fd;
return AE_OK;
}

在上面的代码中,aeCreateFileEvent() 函数用于将文件描述符和事件类型加入到事件循环机制中,实现了对客户端的读写事件监听。

2、多路复用模型

Redis 的多路复用模型主要使用了 epoll 函数实现,可以同时监听多个客户端的读写事件。具体实现代码在 ae_epoll.c 文件中。

static int aeApiCreate(aeEventLoop *eventLoop) {
aeApiState *state = zmalloc(sizeof(aeApiState));
if (!state) return -1;
state->events = zmalloc(sizeof(struct epoll_event)*eventLoop->setsize);
if (!state->events) {
zfree(state);
return -1;
}
state->epfd = epoll_create(1024); /* 1024 is just a hint for the kernel */
if (state->epfd == -1) {
zfree(state->events);
zfree(state);
return -1;
}
eventLoop->apidata = state;
return 0;
}
static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) {
aeApiState *state = eventLoop->apidata;
struct epoll_event ee = {0}; /* avoid valgrind warning */
int op = eventLoop->events[fd].mask == AE_NONE ?
EPOLL_CTL_ADD : EPOLL_CTL_MOD;
ee.events = 0;
mask |= eventLoop->events[fd].mask; /* Merge old events */
if (mask & AE_READABLE) ee.events |= EPOLLIN;
if (mask & AE_WRITABLE) ee.events |= EPOLLOUT;
ee.data.fd = fd;
if (epoll_ctl(state->epfd,op,

在上面的代码中,aeApiCreate() 函数用于创建 epoll 实例,同时初始化事件数组和事件循环机制。aeApiAddEvent() 函数用于将文件描述符和事件类型加入到 epoll 实例中,实现了对客户端的读写事件监听。

同时,在 ae.c 文件中,还有以下关于多路复用模型的代码:

/* Process events in the main loop */
void aeMain(aeEventLoop *eventLoop) {
eventLoop->stop = 0;
while (!eventLoop->stop) {
if (eventLoop->beforesleep != NULL)
eventLoop->beforesleep(eventLoop);
aeProcessEvents(eventLoop, AE_ALL_EVENTS|AE_CALL_AFTER_SLEEP);
}
}
/* Wait for milliseconds for events to happen.
* If milliseconds is -1, wait indefinitely.
* Returns the number of events processed. */
int aeWait(int fd, int mask, long long milliseconds) {
struct epoll_event events[1];
int retval, numevents = 0;
if (milliseconds != -1) {
milliseconds /= 1000;
milliseconds++; /* rounding up */
}
retval = epoll_wait(fd, events, 1, milliseconds);
if (retval > 0) {
if (events[0].events & EPOLLIN) numevents |= AE_READABLE;
if (events[0].events & EPOLLOUT) numevents |= AE_WRITABLE;
if (events[0].events & EPOLLERR) numevents |= AE_WRITABLE;
if (events[0].events & EPOLLHUP) numevents |= AE_WRITABLE;
return numevents;
} else {
return retval;
}
}
/* Process every pending time event, then every pending file event
* (that may be registered by time event callbacks just processed).
* Without special flags the function sleeps until some file event
* fires, or when the next time event occurs (if any).
* If flags is 0, the function does nothing and returns.
* if flags has AE_ALL_EVENTS set, all the kind of events are processed.
* if flags has AE_FILE_EVENTS set, file events are processed.
* if flags has AE_TIME_EVENTS set, time events are processed.
* if flags has AE_DONT_WAIT set the function returns ASAP until all
* the events that's possible to process without to wait are processed. */
void aeProcessEvents(aeEventLoop *eventLoop, int flags) {
int processed = 0, numevents;
/* Nothing to do, return ASAP */
if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return;
/* Note that we want to sleep if we have no events to process. */
if (eventLoop->maxfd != -1 ||
((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) {
int j;
aeTimeEvent *shortest = NULL;
struct timeval tv, *tvp;
if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT))
shortest = aeSearchNearestTimer(eventLoop);
if (shortest) {
long now_sec, now_ms;
aeGetTime(&now_sec, &now_ms);
tvp = &tv;
tvp->tv_sec = shortest->when_sec - now_sec;
if (shortest->when_ms < now_ms) {
tvp->tv_usec = ((shortest->when_ms+1000) - now_ms)*1000;
tv } else {
tvp->tv_usec = (shortest->when_ms - now_ms)*1000;
}
if (tvp->tv_sec < 0) tvp->tv_sec = 0;
if (tvp->tv_usec < 0) tvp->tv_usec = 0;
} else {
/* If we have to check for events but need to return
* ASAP because of AE_DONT_WAIT we need to set the timeout
* to zero */
if (flags & AE_DONT_WAIT) {
tv.tv_sec = tv.tv_usec = 0;
tvp = &tv;
} else {
/* Otherwise we can block */
tvp = NULL; /* wait forever */
}
}
numevents = aeApiPoll(eventLoop, tvp);
for (j = 0; j < numevents; j++) {
aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
int mask = eventLoop->fired[j].mask;
int fd = eventLoop->fired[j].fd;
int fired = 0; /* Number of events fired for current fd. */
/* Note the fe->mask & mask & ... code: maybe an already
* processed event removed an element that fired and we
* still didn't processed, so we check if the event is still
* valid. */
if (fe->mask & mask & AE_READABLE) {
fe->rfileProc(eventLoop, fd, fe->clientData, mask);
fired++;
}
if (fe->mask & mask & AE_WRITABLE) {
fe->wfileProc(eventLoop, fd, fe->clientData, mask);
fired++;
}
if (fe->mask & mask & AE_ERROR) {
fe->efileProc(eventLoop, fd, fe->clientData, mask);
fired++;
}
if (fe->mask & mask & AE_HUP) {
fe->hupfileProc(eventLoop, fd, fe->clientData, mask);
fired++;
}
processed++;
}
}

`aeMain()` 函数是 Redis 事件循环的核心,其中的 `while` 循环不断处理事件,如果事件循环被设置为停止状态,那么就跳出循环。

`aeWait()` 函数主要用于等待事件的发生,如果等待的时间为 -1,表示等待无限长的时间,否则就等待指定的时间后返回。

`aeProcessEvents()` 函数用于处理文件事件和时间事件,其中 `aeSearchNearestTimer()` 函数用于寻找近的时间事件,并将其加入到时间事件的链表中,然后计算出时间事件距离当前时间的时间差,用于设置 epoll_wait() 函数的超时时间。

在 `ae.c` 文件中,还有以下与 Redis 事件循环相关的代码:

/* Create a new event loop */
aeEventLoop *aeCreateEventLoop(int setsize) {
aeEventLoop *eventLoop;
int i;
if ((eventLoop = zmalloc(sizeof(*eventLoop))) == NULL) goto err;
eventLoop->events = zmalloc(sizeof(aeFileEvent)*setsize);
eventLoop->fired = zmalloc(sizeof(aeFiredEvent)*setsize);
if (eventLoop->events ==)

总结

在 Redis 的事件循环中,主要采用了多路复用技术来实现,即使用 epoll 等系统调用来监控文件描述符,以及使用时间事件来处理一些需要在指定时间后执行的任务。

Redis 的事件循环采用单线程的方式实现,但是在处理文件事件时,会使用 I/O 多路复用技术来提高性能,同时还会使用时间事件来实现一些定时任务。

在 Redis 的事件循环中,时间事件和文件事件是分别处理的,时间事件使用链表来保存,文件事件使用 epoll 或 select 等系统调用来实现。在每次循环中,Redis 事件循环都会处理所有已就绪的文件事件,并且会寻找并处理近的时间事件,从而实现了良好的事件处理和定时任务的执行。

了解 Redis 事件循环的实现原理,有助于我们更好地理解 Redis 的工作原理,并且能够更好地调试和优化 Redis 服务器,也是面试中常被问及的高频知识点。

免责声明:本站所有文章内容,图片,视频等均是来源于用户投稿和互联网及文摘转载整编而成,不代表本站观点,不承担相关法律责任。其著作权各归其原作者或其出版社所有。如发现本站有涉嫌抄袭侵权/违法违规的内容,侵犯到您的权益,请在线联系站长,一经查实,本站将立刻删除。 本文来自网络,若有侵权,请联系删除,如若转载,请注明出处:https://yundeesoft.com/49358.html

(0)
上一篇 2024-04-23 11:33
下一篇 2024-05-10 08:00

相关推荐

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注微信