目的

多年前我在《Redis源码剖析笔记》中曾经写过当时redis多线程的情况,包括后台线程和redis6的io多线程,到现在redis8的io多线程进行了完全的重构和巨大的优化,趁这个契机我重新梳理一下,顺便记录一下redis8的多线程代码。

后台线程

Redis 启动了 3 个后台线程来执行关闭 fd、AOF 刷盘、惰性释放 key 的内存等操作。

采用了生产者消费者模型,即队列+互斥锁+条件变量。

RDB文件的创建使用了子进程,严格来说redis是多线程多进程的模型

后台异步任务

bio.h bio.c

  • 总的来说,创建了三个线程分别处理三种不同的任务,任务用链表的方式组织,使用生产者消费者模式,无后台任务时沉睡在条件变量上
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
/* bio.h*/
#define BIO_CLOSE_FILE 0 /* Deferred close(2) syscall. */
#define BIO_AOF_FSYNC 1 /* Deferred AOF fsync. */
#define BIO_LAZY_FREE 2 /* Deferred objects freeing. */
#define BIO_NUM_OPS 3 /* 线程数,即任务类别数 */

/* bio.c */
static pthread_t bio_threads[BIO_NUM_OPS];
static pthread_mutex_t bio_mutex[BIO_NUM_OPS];
static pthread_cond_t bio_newjob_cond[BIO_NUM_OPS];
static pthread_cond_t bio_step_cond[BIO_NUM_OPS];
// 任务链表,每种任务都有一个链表
static list *bio_jobs[BIO_NUM_OPS];
// 每种任务的数量统计
static unsigned long long bio_pending[BIO_NUM_OPS];
// job任务结构
struct bio_job {
time_t time; /* Time at which the job was created. */
/* Job specific arguments pointers */
void *arg1, *arg2, *arg3;
};

/* 后台线程初始化 */
void bioInit(void) {
pthread_attr_t attr;
pthread_t thread;
int j;

/* Initialization of state vars and objects */
for (j = 0; j < BIO_NUM_OPS; j++) {
pthread_mutex_init(&bio_mutex[j],NULL);
pthread_cond_init(&bio_newjob_cond[j],NULL);
pthread_cond_init(&bio_step_cond[j],NULL);
bio_jobs[j] = listCreate();
bio_pending[j] = 0;
}

... /* 省略了设置attr线程栈大小部分代码 */

/* Ready to spawn our threads. We use the single argument the thread
* function accepts in order to pass the job ID the thread is
* responsible of. */
for (j = 0; j < BIO_NUM_OPS; j++) {
void *arg = (void*)(unsigned long) j;
if (pthread_create(&thread,&attr,bioProcessBackgroundJobs,arg) != 0) {
serverLog(LL_WARNING,"Fatal: Can't initialize Background Jobs.");
exit(1);
}
bio_threads[j] = thread;
}
}
/* 后台线程对应事件循环,生产者消费者模式 */
void *bioProcessBackgroundJobs(void *arg) {
struct bio_job *job;
unsigned long type = (unsigned long) arg; //取出线程执行的任务类型
sigset_t sigset;

...

/* 关闭后台线程的SIGALRM信号, 确保只有主线程可以收到此信号 */
pthread_mutex_lock(&bio_mutex[type]);
sigemptyset(&sigset);
sigaddset(&sigset, SIGALRM);
if (pthread_sigmask(SIG_BLOCK, &sigset, NULL))
serverLog(LL_WARNING,
"Warning: can't mask SIGALRM in bio.c thread: %s", strerror(errno));

while(1) {
listNode *ln;

//从类型为type的任务队列中获取第一个任务
ln = listFirst(bio_jobs[type]);
job = ln->value;

//判断当前处理的后台任务类型是哪一种
if (type == BIO_CLOSE_FILE) {
close((long)job->arg1); //如果是关闭文件任务,那就调用close函数
} else if (type == BIO_AOF_FSYNC) {
redis_fsync((long)job->arg1); //如果是AOF同步写任务,那就调用redis_fsy
} else if (type == BIO_LAZY_FREE) {
//如果是惰性删除任务,那根据任务的参数分别调用不同的惰性删除函数执行
if (job->arg1) lazyfreeFreeObjectFromBioThread(job->arg1);
else if (job->arg2 && job->arg3)
lazyfreeFreeDatabaseFromBioThread(job->arg2,job->arg3);
else if (job->arg3)
lazyfreeFreeSlotsMapFromBioThread(job->arg3);
} else {
serverPanic("Wrong job type in bioProcessBackgroundJobs().");
}
//任务执行完成后,调用listDelNode在任务队列中删除该任务
listDelNode(bio_jobs[type],ln); //将对应的等待任务个数减一。
bio_pending[type]--;

}
}

/* 创建后台任务API */
void bioCreateBackgroundJob(int type, void *arg1, void *arg2, void *arg3) {
struct bio_job *job = zmalloc(sizeof(*job));

job->time = time(NULL);
job->arg1 = arg1;
job->arg2 = arg2;
job->arg3 = arg3;
pthread_mutex_lock(&bio_mutex[type]);
listAddNodeTail(bio_jobs[type],job); //尾插
bio_pending[type]++; //对应任务数加一
pthread_cond_signal(&bio_newjob_cond[type]);
pthread_mutex_unlock(&bio_mutex[type]);
}

异步删除与同步删除

lazyfree.c

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
int dbAsyncDelete(redisDb *db, robj *key) {
// 直接将过期表中的键删除,安全的,因为是共享对象,只会使refCount减1
if (dictSize(db->expires) > 0) dictDelete(db->expires,key->ptr);

// 只将哈希表中的对应条目去除,但没有释放内存
dictEntry *de = dictUnlink(db->dict,key->ptr);
if (de) {
// 去除对应条目中的val
robj *val = dictGetVal(de);
size_t free_effort = lazyfreeGetFreeEffort(val);


if (free_effort > LAZYFREE_THRESHOLD && val->refcount == 1) {
atomicIncr(lazyfree_objects,1);
// 把条目中的val用后台线程异步清理,因为val可能很大
bioCreateBackgroundJob(BIO_LAZY_FREE,val,NULL,NULL);
// 然后把val置NULL,防止重复清理
dictSetVal(db->dict,de,NULL);
}
}
// 然后直接在主线程中清理key 和 哈希表对应条目entry
if (de) {
dictFreeUnlinkedEntry(db->dict,de);
if (server.cluster_enabled) slotToKeyDel(key);
return 1;
} else {
return 0;
}
}
/* 所以只有val是在后台线程清理, key 和 entry还是同步清理 */
1
2
3
4
5
6
7
8
9
10
/* 同步直接原地清理 */
int dbSyncDelete(redisDb *db, robj *key) {
if (dictSize(db->expires) > 0) dictDelete(db->expires,key->ptr);
if (dictDelete(db->dict,key->ptr) == DICT_OK) {
if (server.cluster_enabled) slotToKeyDel(key);
return 1;
} else {
return 0;
}
}

redis6的io多线程

实现思路

  在redis6.0之前,如果遇到某个套接字可读或可写事件,它会直接对其进行处理,比如直接read或write它,当然write有些差别,是先放到输出缓冲区然后Redis会在进入eventloop之前对其遍历处理。

  但在Redis6.0中,主线程碰到Tcp连接可读事件,并调用读回调函数时,它回调函数里修改了逻辑,它会在满足一些条件时,将这个读事件处理推迟,将这个读事件放入一个全局变量-链表中,相当于对事件循环执行过程中,会将所有读写事件实际I/O推迟,放到读和写两个待执行事件链表中。

  然后主线程在进入下一次事件循环前,同样也会执行一个叫做beforeSleep的回调函数,此函数会执行分发操作,将刚才放到链表里的事件用round-robin的方法均匀的分发给I/O线程中,当然主线程也会分到一部分,接着所有线程开始处理分发到的I/O任务,主线程处理完后会等待I/O线程处理完毕,源码的检查方法是遍历每一个I/O线程的任务链表累计数量,当累计和为0就表示所有I/O线程都处理完了,接着主线程会对所有的处理完I/O的事件进行处理,比如执行对应的命令,所以Redis的多线程实际上只有主线程会执行命令,I/O线程只负责I/O任务或者命令的解析,实际命令执行还是在主线程中做。

  Redis多线程也是一种多I/O线程的实现思路,即它实际分发的是客户端的I/O事件,而不是分发连接。

  Redis这种本质上不是完全并行的,而是串行和并行都使用了,比如主线程需要收集所有的I/O读写事件放入对应的链表中,然后在主线程中进行round-robin分发任务,最后在所有I/O线程处理完I/O事件后也只有主线程执行命令,这段时间里其实只有主线程在工作,而从I/O线程都在睡眠,所以它并不是全并行的,性能也比较差,主线程还多了额外收集和分配的工作。

代码精选

1
2
3
4
5
6
7
8
9
10
/* 保存每个io线程的描述符 */
pthread_t io_threads[IO_THREADS_MAX_NUM];
/* 保存线程互斥锁 */
pthread_mutex_t io_threads_mutex[IO_THREADS_MAX_NUM];
/* 保存每个线程待处理的客户端个数 */
threads_pending io_threads_pending[IO_THREADS_MAX_NUM];
/* This is the list of clients each thread will serve when threaded I/O is
* 保存每个io线程需要处理的客户端链表
*/
list *io_threads_list[IO_THREADS_MAX_NUM];

线程初始化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
/* Initialize the data structures needed for threaded I/O. */
void initThreadedIO(void) {
server.io_threads_active = 0; /* We start with threads not active. */

/* Indicate that io-threads are currently idle */
io_threads_op = IO_THREADS_OP_IDLE;

/* Don't spawn any thread if the user selected a single thread:
* we'll handle I/O directly from the main thread. */
if (server.io_threads_num == 1) return;

if (server.io_threads_num > IO_THREADS_MAX_NUM) {
serverLog(LL_WARNING,"Fatal: too many I/O threads configured. "
"The maximum number is %d.", IO_THREADS_MAX_NUM);
exit(1);
}

/* Spawn and initialize the I/O threads. */
for (int i = 0; i < server.io_threads_num; i++) {
/* Things we do for all the threads including the main thread. */
io_threads_list[i] = listCreate();
if (i == 0) continue; /* Thread 0 is the main thread. */

/* Things we do only for the additional threads. */
pthread_t tid;
pthread_mutex_init(&io_threads_mutex[i],NULL);
setIOPendingCount(i, 0);
/* 这个锁会让I/O线程先进入不了循环,等待主线程给任务给他 */
//会在handleClientsWithPendingWritesUsingThreads()的startThreadedIO()解锁
pthread_mutex_lock(&io_threads_mutex[i]); /* Thread will be stopped. */
if (pthread_create(&tid,NULL,IOThreadMain,(void*)(long)i) != 0) {
serverLog(LL_WARNING,"Fatal: Can't initialize IO thread.");
exit(1);
}
io_threads[i] = tid;
}
}

io线程主循环

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
void *IOThreadMain(void *myid) {
/* The ID is the thread number (from 0 to server.iothreads_num-1), and is
* used by the thread to just manipulate a single sub-array of clients. */
long id = (unsigned long)myid;
char thdname[16];

snprintf(thdname, sizeof(thdname), "io_thd_%ld", id);
redis_set_thread_title(thdname);
redisSetCpuAffinity(server.server_cpulist);
makeThreadKillable();

while(1) {
/* 自旋一段时间,等待主线程分配任务 */
for (int j = 0; j < 1000000; j++) {
if (getIOPendingCount(id) != 0) break;
}

/* Give the main thread a chance to stop this thread. */
if (getIOPendingCount(id) == 0) {
// 一般来说,会在这里阻塞,因为主线程在分配任务前持有锁
pthread_mutex_lock(&io_threads_mutex[id]);
pthread_mutex_unlock(&io_threads_mutex[id]);
continue;
}

serverAssert(getIOPendingCount(id) != 0);

/* Process: note that the main thread will never touch our list
* before we drop the pending count to 0. */
listIter li;
listNode *ln;
listRewind(io_threads_list[id],&li);
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
if (io_threads_op == IO_THREADS_OP_WRITE) {
writeToClient(c,0);
} else if (io_threads_op == IO_THREADS_OP_READ) {
readQueryFromClient(c->conn);
} else {
serverPanic("io_threads_op value is unknown");
}
}
listEmpty(io_threads_list[id]);
setIOPendingCount(id, 0);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
/* 读操作推迟 */
void readQueryFromClient(connection *conn) {
...
if (postponeClientRead(c)) return;
...
}

int postponeClientRead(client *c) {
if (server.io_threads_active &&
server.io_threads_do_reads &&
!clientsArePaused() &&
!ProcessingEventsWhileBlocked &&
!(c->flags & (CLIENT_MASTER|CLIENT_SLAVE|CLIENT_PENDING_READ)))
{
c->flags |= CLIENT_PENDING_READ;
// 将要处理的读操作放入全局变量 clients_pending_read 链表
listAddNodeHead(server.clients_pending_read, c);
return 1;
} else {
return 0;
}
}

redis8的io多线程

pr见:
https://github.com/redis/redis/pull/13695

同时推荐一篇valkey8.0的多线程实现讲解的文章:https://cloud.tencent.com/developer/article/2552574

实现的效果是:io线程和主线程完全异步,主线程无需做任何io操作,收到了client直接分发给io线程,epoll也再也不需要关心分发出去的client。主线程全程只需要执行命令即可。

官方翻译

总体架构

和之前一样,我们没有改变所有客户端命令必须在主线程上执行这一核心原则。这是因为 Redis 最初被设计为单线程架构,以多线程方式处理命令将不可避免地引入大量的竞态条件(race condition) 和同步(synchronization) 问题。但现在,每个 I/O 线程都拥有独立的事件循环(event loop)。因此,I/O 线程可以采用多路复用(multiplexing) 的方式来处理客户端的读写操作,从而消除了由忙等待(busy-waiting) 引起的 CPU 开销。

如下图所示,执行流程可以简要描述如下:

  1. 主线程在接受连接后,将客户端分配给 I/O 线程。
  2. I/O 线程在客户端完成读取和解析查询后,会通知主线程。
  3. 主线程处理来自 I/O 线程的查询并生成回复。
  4. 主线程将客户端列表交给 I/O 线程后,I/O 线程负责将回复写入客户端。
  5. 随后,I/O 线程继续处理客户端的读写事件。

每个 I/O 线程拥有独立的事件循环

我们现在为每个 I/O 线程分配了其专属的事件循环。这种方法消除了主线程为处理连接(特定连接除外)而执行昂贵的 epoll_wait 系统调用的需要。取而代之的是,主线程处理来自 I/O 线程的请求,并在完成后交还,从而将读写事件完全卸载(offload) 给了 I/O 线程。

此外,所有 TLS(传输层安全) 操作,包括处理挂起数据(pending data),都已完全移至 I/O 线程。这解决了 io-threads-do-reads 配置无法与 TLS 一同使用的问题。

事件通知的客户端队列

为了促进 I/O 线程与主线程之间的通信,我们设计了一个事件通知的客户端队列(event-notified client queue)。每个 I/O 线程和主线程都拥有两个这样的队列,用于存储等待处理的客户端。这些队列也与事件循环集成,以支持事件驱动处理。我们使用 pthread_mutex(互斥锁)来确保队列操作的线程安全(thread safety)、数据可见性(data visibility) 和顺序一致性(ordering)。由于每个 I/O 线程和主线程操作独立的队列,竞态条件(race conditions) 被最小化,同时也避免了线程因锁竞争(lock contention) 而挂起。我们还基于 eventfd 或管道(pipe) 实现了一个事件通知器(event notifier) 来支持事件驱动处理。

线程安全

由于主线程和 I/O 线程可以并行执行,我们必须谨慎处理数据竞争(data race) 问题。

  • client->flags:I/O 线程的主要任务是读写操作,即 readQueryFromClient 和 writeToClient。然而,I/O 线程和主线程可能会并发修改或访问 client->flags,从而导致潜在的竞态条件。为了解决这个问题,我们引入了一个 io-flags 变量来记录 I/O 线程执行的操作,从而避免了对 client->flags 的竞态条件。

  • 暂停 I/O 线程:在主线程中,我们可能需要对 I/O 线程的数据进行操作,例如卸载事件处理器(uninstall event handler)、访问或操作查询/输出缓冲区(query/output buffer) 或调整事件循环大小(resize event loop)。我们需要一个干净且安全的上下文来进行这些操作。我们在 IOThreadBeforeSleep 函数中暂停(pause) I/O 线程,执行一些任务,然后恢复(resume) 它。为了避免线程被挂起,我们使用忙等待(busy waiting) 来确认目标状态。此外,我们使用原子变量(atomic variable) 来确保内存可见性(memory visibility) 和顺序(ordering)。我们引入了以下函数来暂停/恢复 I/O 线程:

pauseIOThread, resumeIOThread

pauseAllIOThreads, resumeAllIOThreads

pauseIOThreadsRange, resumeIOThreadsRange

测试表明 pauseIOThread 的效率非常高,在压力测试中主线程每秒可以执行近 200,000 次此操作。同样,暂停 8 个 I/O 线程的 pauseAllIOThreads 每秒也可以处理近 56,000 次操作。但是,在暂停和恢复 I/O 线程之间执行的操作必须非常快速;否则,可能导致 I/O 线程达到 100% CPU 利用率(full CPU utilization)。

freeClient 和 freeClientAsync:主线程可能需要终止当前正在 I/O 线程上运行的客户端,例如,由于 ACL(访问控制列表) 规则更改、达到输出缓冲区限制(output buffer limit) 或驱逐客户端(evicting a client)。在这种情况下,我们需要暂停相应的 I/O 线程以便安全地操作该客户端。

maxclients 和 maxmemory-clients 更新:当调整 maxclients 时,我们需要为所有 I/O 线程调整事件循环的大小(resize the event loop)。类似地,当修改 maxmemory-clients 时,我们需要遍历(traverse) 所有客户端以计算它们的内存使用量。为确保操作安全,我们在这些调整期间会暂停所有 I/O 线程。

客户端信息读取:主线程可能需要读取客户端的字段以生成描述性字符串,例如用于 CLIENT LIST 命令或日志记录目的。在这种情况下,我们需要暂停处理该客户端的 I/O 线程。如果需要显示所有客户端的信息,则必须暂停所有 I/O 线程。

跟踪重定向(Tracking redirect):Redis 支持跟踪(tracking) 功能,甚至可以向具有指定 ID 的连接发送失效消息(invalidation messages)。但目标客户端可能正在 I/O 线程上运行,直接操作客户端的输出缓冲区是非线程安全(not thread-safe) 的,并且 I/O 线程可能不知道客户端需要响应。在这种情况下,我们会暂停处理该客户端的 I/O 线程,修改其输出缓冲区,并安装一个写事件处理器(write event handler) 以确保正确处理。

clientsCron:在 clientsCron 函数中,主线程需要遍历所有客户端以执行操作,例如超时检查(timeout checks)、验证是否达到软输出缓冲区限制(soft output buffer limit)、调整输出/查询缓冲区大小或更新内存使用量。为了安全地操作客户端,必须暂停处理该客户端的 I/O 线程。如果我们为每个客户端单独暂停其 I/O 线程,效率会非常低。相反,同时暂停所有 I/O 线程的成本又很高,尤其是在 I/O 线程数量较多且 clientsCron 调用相对频繁的情况下。为了解决这个问题,我们采用了批量(batched) 暂停 I/O 线程的方法。最多同时暂停 8 个 I/O 线程。上述操作仅对在已暂停的 I/O 线程中运行的客户端执行,这显著降低了开销,同时保持了安全性。

代码解析

阅读代码解析前,怀着这样的主线去读

  1. 主线程如何将连接分发给io线程的
  2. io线程发现了读写请求后会怎么处理
  3. io线程处理完读写请求后怎么通知主线程处理命令
  4. 主线程处理命令的过程中,如何做到io线程不处理该client的

io线程逻辑

里面很多变量从命名就能看出来其含义,如以下均用于主线程和io线程交互client的

1
2
3
4
5
6
/* For main thread */
static list *mainThreadPendingClientsToIOThreads[IO_THREADS_MAX_NUM]; /* Clients to IO threads */
static list *mainThreadProcessingClients[IO_THREADS_MAX_NUM]; /* Clients in processing */
static list *mainThreadPendingClients[IO_THREADS_MAX_NUM]; /* Pending clients from IO threads */
static pthread_mutex_t mainThreadPendingClientsMutexes[IO_THREADS_MAX_NUM]; /* Mutex for pending clients */
static eventNotifier* mainThreadPendingClientsNotifiers[IO_THREADS_MAX_NUM]; /* Notifier for pending clients */
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
/* Initialize the data structures needed for threaded I/O. */
void initThreadedIO(void) {
if (server.io_threads_num <= 1) return;

server.io_threads_active = 1;
...
/* Spawn and initialize the I/O threads. */
for (int i = 1; i < server.io_threads_num; i++) {
IOThread *t = &IOThreads[i];
t->id = i;
// 每个io线程一个eventloop
t->el = aeCreateEventLoop(server.maxclients+CONFIG_FDSET_INCR);
t->el->privdata[0] = t;
t->pending_clients = listCreate();
t->processing_clients = listCreate();
t->pending_clients_to_main_thread = listCreate();
t->clients = listCreate();
atomicSetWithSync(t->paused, IO_THREAD_UNPAUSED);
atomicSetWithSync(t->running, 0);

pthread_mutexattr_t *attr = NULL;
pthread_mutex_init(&t->pending_clients_mutex, attr);
// 和主线程的交互通知通过eventfd
t->pending_clients_notifier = createEventNotifier();
if (aeCreateFileEvent(t->el, getReadEventFd(t->pending_clients_notifier),
AE_READABLE, handleClientsFromMainThread, t) != AE_OK)
{
serverLog(LL_WARNING, "Fatal: Can't register file event for IO thread notifications.");
exit(1);
}

/* This is the timer callback of the IO thread, used to gradually handle
* some background operations, such as clients cron. */
if (aeCreateTimeEvent(t->el, 1, IOThreadCron, t, NULL) == AE_ERR) {
serverLog(LL_WARNING, "Fatal: Can't create event loop timers in IO thread.");
exit(1);
}

/* Create IO thread */
if (pthread_create(&t->tid, NULL, IOThreadMain, (void*)t) != 0) {
serverLog(LL_WARNING, "Fatal: Can't initialize IO thread.");
exit(1);
}

/* For main thread */
mainThreadPendingClientsToIOThreads[i] = listCreate();
mainThreadPendingClients[i] = listCreate();
mainThreadProcessingClients[i] = listCreate();
pthread_mutex_init(&mainThreadPendingClientsMutexes[i], attr);
mainThreadPendingClientsNotifiers[i] = createEventNotifier();
if (aeCreateFileEvent(server.el, getReadEventFd(mainThreadPendingClientsNotifiers[i]),
AE_READABLE, handleClientsFromIOThread, t) != AE_OK)
{
serverLog(LL_WARNING, "Fatal: Can't register file event for main thread notifications.");
exit(1);
}
if (attr) zfree(attr);
}
}

我们需要关注的是两个函数,一个是io线程的主循环IOThreadMain,这里可以简单的理解为在做epoll的事件循环即可,其beforeSleepAfterSleep不需要关心。

1
2
3
4
5
6
7
8
void *IOThreadMain(void *ptr) {
IOThread *t = ptr;
...
aeSetBeforeSleepProc(t->el, IOThreadBeforeSleep);
aeSetAfterSleepProc(t->el, IOThreadAfterSleep);
aeMain(t->el);
return NULL;
}

另一个是主线程通知io线程处理新的client的函数handleClientsFromMainThread(),io线程会将这些client绑定到自己的eventloop。

1
2
3
4
5
6
7
8
9
10
/* After the main thread processes the clients, it will send the clients back to
* io threads to handle, and fire an event, the io thread handles the event by
* this function. */
void handleClientsFromMainThread(struct aeEventLoop *ae, int fd, void *ptr, int mask) {
IOThread *t = ptr;

handleEventNotifier(t->pending_clients_notifier);
/* Process the clients from main thread. */
processClientsFromMainThread(t);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
/* Processing clients that have finished executing commands from the main thread.
* If the client is not binded to the event loop, we should bind it first and
* install read handler. If the client still has query buffer, we should process
* the input buffer. If the client has pending reply, we just reply to client,
* and then install write handler if needed. */
int processClientsFromMainThread(IOThread *t) {
pthread_mutex_lock(&t->pending_clients_mutex);
// 在这里将新的clients移动到自己的线程内部队列中,并后续逐个处理
listJoin(t->processing_clients, t->pending_clients);
pthread_mutex_unlock(&t->pending_clients_mutex);
size_t processed = listLength(t->processing_clients);
if (processed == 0) return 0;

listIter li;
listNode *ln;
listRewind(t->processing_clients, &li);
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
serverAssert(!(c->io_flags & (CLIENT_IO_READ_ENABLED | CLIENT_IO_WRITE_ENABLED)));
/* Main thread must handle clients with CLIENT_CLOSE_ASAP flag, since
* we only set io_flags when clients in io thread are freed ASAP. */
serverAssert(!(c->flags & CLIENT_CLOSE_ASAP));

/* Link client in IO thread clients list first. */
serverAssert(c->io_thread_client_list_node == NULL);
listUnlinkNode(t->processing_clients, ln);
listLinkNodeTail(t->clients, ln);
c->io_thread_client_list_node = listLast(t->clients);

/* The client now is in the IO thread, let's free deferred objects. */
freeClientDeferredObjects(c, 0);
...

/* Only bind once, we never remove read handler unless freeing client. */
if (!connHasEventLoop(c->conn)) {
connRebindEventLoop(c->conn, t->el);
serverAssert(!connHasReadHandler(c->conn));
// 此处绑定client的fd到自己的eventloop,读事件回调为readQueryFromClient
connSetReadHandler(c->conn, readQueryFromClient);
}

/* If the client has pending replies, write replies to client. */
if (clientHasPendingReplies(c)) {
writeToClient(c, 0);
if (!(c->io_flags & CLIENT_IO_CLOSE_ASAP) && clientHasPendingReplies(c)) {
// 如果写回复没有发完,就把可写事件触发
connSetWriteHandler(c->conn, sendReplyToClient);
}
}
}
/* All clients must are processed. */
serverAssert(listLength(t->processing_clients) == 0);
return processed;
}

在读事件函数readQueryFromClient()->processInputBuffer()中,当解析完了一个完整的命令会走到以下逻辑,可以看到其负责了找命令和命令参数个数正确性检查。

1
2
3
4
5
6
7
8
9
10
11
12
if (c->running_tid != IOTHREAD_MAIN_THREAD_ID) {
c->io_flags |= CLIENT_IO_PENDING_COMMAND;
c->iolookedcmd = lookupCommand(c->argv, c->argc);
if (c->iolookedcmd && !commandCheckArity(c->iolookedcmd, c->argc, NULL)) {
/* The command was found, but the arity is invalid, reset it and let main
* thread handle. To avoid memory prefetching on an invalid command. */
c->iolookedcmd = NULL;
}
c->slot = getSlotFromCommand(c->iolookedcmd, c->argv, c->argc);
enqueuePendingClientsToMainThread(c, 0);
break;
}

命令解析完毕后将client转给主线程,注意代码片段c->io_flags &= ~(CLIENT_IO_READ_ENABLED | CLIENT_IO_WRITE_ENABLED);
这里flags设置完后,io线程将不会对这个client做任何读写套接字的操作,以用于避免竞态条件。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
/* When IO threads read a complete query of clients or want to free clients, it
* should remove it from its clients list and put the client in the list to main
* thread, we will send these clients to main thread in IOThreadBeforeSleep. */
void enqueuePendingClientsToMainThread(client *c, int unbind) {
/* If the IO thread may no longer manage it, such as closing client, we should
* unbind client from event loop, so main thread doesn't need to do it costly. */
if (unbind) connUnbindEventLoop(c->conn);
/* Just skip if it already is transferred. */
if (c->io_thread_client_list_node) {
IOThread *t = &IOThreads[c->tid];
/* If there are several clients to process, let the main thread handle them ASAP.
* Since the client being added to the queue may still need to be processed by
* the IO thread, we must call this before adding it to the queue to avoid
* races with the main thread. */
sendPendingClientsToMainThreadIfNeeded(t, 1);
/* Disable read and write to avoid race when main thread processes. */
c->io_flags &= ~(CLIENT_IO_READ_ENABLED | CLIENT_IO_WRITE_ENABLED);
/* Remove the client from IO thread, add it to main thread's pending list. */
listUnlinkNode(t->clients, c->io_thread_client_list_node);
listLinkNodeTail(t->pending_clients_to_main_thread, c->io_thread_client_list_node);
c->io_thread_client_list_node = NULL;
}
}

自此io线程的核心流程叙述完毕,可以看到并不复杂,即新client被最初分发过来时注册到epoll中一次,后续io线程自己管理这个fd进行读写操作,一旦完成了一个完整命令的解析,便通知主线程处理。

主线程逻辑

首先是主线程监听套接字的listen()的处理函数需要关注,即关注其怎么分发client的

代码链路为clientAcceptHandler()->assignClientToIOThread(),以下可以看到是选client最少的io线程来接管这个新的client

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
/* When the main thread accepts a new client or transfers clients to IO threads,
* it assigns the client to the IO thread with the fewest clients. */
void assignClientToIOThread(client *c) {
serverAssert(c->tid == IOTHREAD_MAIN_THREAD_ID);
/* Find the IO thread with the fewest clients. */
int min_id = 0;
int min = INT_MAX;
for (int i = 1; i < server.io_threads_num; i++) {
if (server.io_threads_clients_num[i] < min) {
min = server.io_threads_clients_num[i];
min_id = i;
}
}

/* Assign the client to the IO thread. */
server.io_threads_clients_num[c->tid]--;
c->tid = min_id;
c->running_tid = min_id;
server.io_threads_clients_num[min_id]++;

/* The client running in IO thread needs to have deferred objects array. */
c->deferred_objects = zmalloc(sizeof(robj*) * CLIENT_MAX_DEFERRED_OBJECTS);

/* Unbind connection of client from main thread event loop, disable read and
* write, and then put it in the list, main thread will send these clients
* to IO thread in beforeSleep. */
connUnbindEventLoop(c->conn);
c->io_flags &= ~(CLIENT_IO_READ_ENABLED | CLIENT_IO_WRITE_ENABLED);
listAddNodeTail(mainThreadPendingClientsToIOThreads[c->tid], c);
}

接着在beforeSleep()中执行sendPendingClientsToIOThreads(),批量的真正的分发client过去,即追加到t->pending_clients中,并触发eventfd。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
/* Add the pending clients to the list of IO threads, and trigger an event to
* notify io threads to handle. */
int sendPendingClientsToIOThreads(void) {
int processed = 0;
for (int i = 1; i < server.io_threads_num; i++) {
int len = listLength(mainThreadPendingClientsToIOThreads[i]);
if (len > 0) {
IOThread *t = &IOThreads[i];
pthread_mutex_lock(&t->pending_clients_mutex);
listJoin(t->pending_clients, mainThreadPendingClientsToIOThreads[i]);
pthread_mutex_unlock(&t->pending_clients_mutex);
/* Trigger an event, maybe an error is returned when buffer is full
* if using pipe, but no worry, io thread will handle all clients
* in list when receiving a notification. */
triggerEventNotifier(t->pending_clients_notifier);
}
processed += len;
}
return processed;
}

接着我们关心io线程处理完了io事件,比如解析到了一个完整的命令,其将client转回给主线程时,他们是怎么交互的

我们在io线程初始化时,给主线程关注的eventfd中注册了如下的处理函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
/* When the io thread finishes processing the client with the read event, it will
* notify the main thread through event triggering in IOThreadBeforeSleep. The main
* thread handles the event through this function. */
void handleClientsFromIOThread(struct aeEventLoop *el, int fd, void *ptr, int mask) {
...
IOThread *t = ptr;

/* Handle fd event first. */
serverAssert(fd == getReadEventFd(mainThreadPendingClientsNotifiers[t->id]));
handleEventNotifier(mainThreadPendingClientsNotifiers[t->id]);

/* Process the clients from IO threads. */
processClientsFromIOThread(t);
}

最重要的实际命令处理函数如下,其从mainThreadPendingClients一个一个取出client执行其在上面的命令,每执行完一个,便把这个client重新挂回mainThreadPendingClientsToIOThreads中,并在积累了一定数量后通过函数sendPendingClientsToIOThreadIfNeeded()转移这批clients到t->pending_clients中,用eventfd通知其进行命令回复。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87

/* The main thread processes the clients from IO threads, these clients may have
* a complete command to execute or need to be freed. Note that IO threads never
* free client since this operation access much server data.
*
* Please notice that this function may be called reentrantly, i,e, the same goes
* for handleClientsFromIOThread and processClientsOfAllIOThreads. For example,
* when processing script command, it may call processEventsWhileBlocked to
* process new events, if the clients with fired events from the same io thread,
* it may call this function reentrantly. */
int processClientsFromIOThread(IOThread *t) {
/* Get the list of clients to process. */
pthread_mutex_lock(&mainThreadPendingClientsMutexes[t->id]);
listJoin(mainThreadProcessingClients[t->id], mainThreadPendingClients[t->id]);
pthread_mutex_unlock(&mainThreadPendingClientsMutexes[t->id]);
size_t processed = listLength(mainThreadProcessingClients[t->id]);
if (processed == 0) return 0;
...

listNode *node = NULL;
while (listLength(mainThreadProcessingClients[t->id])) {
...
/* Each time we pop up only the first client to process to guarantee
* reentrancy safety. */
if (node) zfree(node);
node = listFirst(mainThreadProcessingClients[t->id]);
listUnlinkNode(mainThreadProcessingClients[t->id], node);
client *c = listNodeValue(node);

...
/* Let main thread to run it, set running thread id first. */
c->running_tid = IOTHREAD_MAIN_THREAD_ID;

/* If a read error occurs, handle it in the main thread first, since we
* want to print logs about client information before freeing. */
if (c->read_error) handleClientReadError(c);

/* The client is asked to close in IO thread. */
if (c->io_flags & CLIENT_IO_CLOSE_ASAP) {
freeClient(c);
continue;
}

...

// 实际的命令处理位置
/* Process the pending command and input buffer. */
if (!c->read_error && c->io_flags & CLIENT_IO_PENDING_COMMAND) {
c->flags |= CLIENT_PENDING_COMMAND;
if (processPendingCommandAndInputBuffer(c) == C_ERR) {
/* If the client is no longer valid, it must be freed safely. */
continue;
}
}

/* We may have pending replies if io thread may not finish writing
* reply to client, so we did not put the client in pending write
* queue. And we should do that first since we may keep the client
* in main thread instead of returning to io threads. */
if (!(c->flags & CLIENT_PENDING_WRITE) && clientHasPendingReplies(c))
putClientInPendingWriteQueue(c);

...

/* Remove this client from pending write clients queue of main thread,
* And some clients may do not have reply if CLIENT REPLY OFF/SKIP. */
if (c->flags & CLIENT_PENDING_WRITE) {
c->flags &= ~CLIENT_PENDING_WRITE;
listUnlinkNode(server.clients_pending_write, &c->clients_pending_write_node);
}
c->running_tid = c->tid;
listLinkNodeHead(mainThreadPendingClientsToIOThreads[c->tid], node);
node = NULL;

/* If there are several clients to process, let io thread handle them ASAP. */
// 可以看到并不是全部的client处理完后再让io线程工作,而是处理了一部分就转移一部份,尽可能实现全并行。
sendPendingClientsToIOThreadIfNeeded(t, 1);
}
if (node) zfree(node);

/* Send the clients to io thread without pending size check, since main thread
* may process clients from other io threads, so we need to send them to the
* io thread to process in prallel. */
sendPendingClientsToIOThreadIfNeeded(t, 0);

return processed;
}

valkey的多线程

我也简单扫了一下valkey的io多线程实现,我发现和redis8的实现差距还是非常大的。

redis8将client交给了io线程管理,即io线程来做epoll。主线程彻底下放了管理epoll和处理epoll的开销。

而valkey是通过主线程包装成一个一个job的方式,用无锁队列来把job分发给io线程,比如有这几类job

  • 读io和解析命令
  • 写io
  • 内存释放(对象free)

主线程的epoll还是管理全部的fd的。

但是redis8也有一个cpu浪费的问题,当io线程把client转交回主线程时,等待主线程处理命令的过程中,io线程的epoll其实是会busy loop的,即一直触发但是不执行任何io事件。比如代码是依赖的io_flag这个标识来决定是否触发真正的读写事件。

总的来说,我更倾向于redis8的实现,更加彻底和优美。