/* Initialization of state vars and objects */ for (j = 0; j < BIO_NUM_OPS; j++) { pthread_mutex_init(&bio_mutex[j],NULL); pthread_cond_init(&bio_newjob_cond[j],NULL); pthread_cond_init(&bio_step_cond[j],NULL); bio_jobs[j] = listCreate(); bio_pending[j] = 0; }
... /* 省略了设置attr线程栈大小部分代码 */ /* Ready to spawn our threads. We use the single argument the thread * function accepts in order to pass the job ID the thread is * responsible of. */ for (j = 0; j < BIO_NUM_OPS; j++) { void *arg = (void*)(unsignedlong) j; if (pthread_create(&thread,&attr,bioProcessBackgroundJobs,arg) != 0) { serverLog(LL_WARNING,"Fatal: Can't initialize Background Jobs."); exit(1); } bio_threads[j] = thread; } } /* 后台线程对应事件循环,生产者消费者模式 */ void *bioProcessBackgroundJobs(void *arg) { structbio_job *job; unsignedlong type = (unsignedlong) arg; //取出线程执行的任务类型 sigset_t sigset;
/* 保存每个io线程的描述符 */ pthread_t io_threads[IO_THREADS_MAX_NUM]; /* 保存线程互斥锁 */ pthread_mutex_t io_threads_mutex[IO_THREADS_MAX_NUM]; /* 保存每个线程待处理的客户端个数 */ threads_pending io_threads_pending[IO_THREADS_MAX_NUM]; /* This is the list of clients each thread will serve when threaded I/O is * 保存每个io线程需要处理的客户端链表 */ list *io_threads_list[IO_THREADS_MAX_NUM];
/* Initialize the data structures needed for threaded I/O. */ voidinitThreadedIO(void){ server.io_threads_active = 0; /* We start with threads not active. */
/* Indicate that io-threads are currently idle */ io_threads_op = IO_THREADS_OP_IDLE;
/* Don't spawn any thread if the user selected a single thread: * we'll handle I/O directly from the main thread. */ if (server.io_threads_num == 1) return;
if (server.io_threads_num > IO_THREADS_MAX_NUM) { serverLog(LL_WARNING,"Fatal: too many I/O threads configured. " "The maximum number is %d.", IO_THREADS_MAX_NUM); exit(1); }
/* Spawn and initialize the I/O threads. */ for (int i = 0; i < server.io_threads_num; i++) { /* Things we do for all the threads including the main thread. */ io_threads_list[i] = listCreate(); if (i == 0) continue; /* Thread 0 is the main thread. */
/* Things we do only for the additional threads. */ pthread_t tid; pthread_mutex_init(&io_threads_mutex[i],NULL); setIOPendingCount(i, 0); /* 这个锁会让I/O线程先进入不了循环,等待主线程给任务给他 */ //会在handleClientsWithPendingWritesUsingThreads()的startThreadedIO()解锁 pthread_mutex_lock(&io_threads_mutex[i]); /* Thread will be stopped. */ if (pthread_create(&tid,NULL,IOThreadMain,(void*)(long)i) != 0) { serverLog(LL_WARNING,"Fatal: Can't initialize IO thread."); exit(1); } io_threads[i] = tid; } }
void *IOThreadMain(void *myid){ /* The ID is the thread number (from 0 to server.iothreads_num-1), and is * used by the thread to just manipulate a single sub-array of clients. */ long id = (unsignedlong)myid; char thdname[16];
/* Give the main thread a chance to stop this thread. */ if (getIOPendingCount(id) == 0) { // 一般来说,会在这里阻塞,因为主线程在分配任务前持有锁 pthread_mutex_lock(&io_threads_mutex[id]); pthread_mutex_unlock(&io_threads_mutex[id]); continue; }
serverAssert(getIOPendingCount(id) != 0);
/* Process: note that the main thread will never touch our list * before we drop the pending count to 0. */ listIter li; listNode *ln; listRewind(io_threads_list[id],&li); while((ln = listNext(&li))) { client *c = listNodeValue(ln); if (io_threads_op == IO_THREADS_OP_WRITE) { writeToClient(c,0); } elseif (io_threads_op == IO_THREADS_OP_READ) { readQueryFromClient(c->conn); } else { serverPanic("io_threads_op value is unknown"); } } listEmpty(io_threads_list[id]); setIOPendingCount(id, 0); } }
/* For main thread */ static list *mainThreadPendingClientsToIOThreads[IO_THREADS_MAX_NUM]; /* Clients to IO threads */ static list *mainThreadProcessingClients[IO_THREADS_MAX_NUM]; /* Clients in processing */ static list *mainThreadPendingClients[IO_THREADS_MAX_NUM]; /* Pending clients from IO threads */ staticpthread_mutex_t mainThreadPendingClientsMutexes[IO_THREADS_MAX_NUM]; /* Mutex for pending clients */ static eventNotifier* mainThreadPendingClientsNotifiers[IO_THREADS_MAX_NUM]; /* Notifier for pending clients */
/* This is the timer callback of the IO thread, used to gradually handle * some background operations, such as clients cron. */ if (aeCreateTimeEvent(t->el, 1, IOThreadCron, t, NULL) == AE_ERR) { serverLog(LL_WARNING, "Fatal: Can't create event loop timers in IO thread."); exit(1); }
/* After the main thread processes the clients, it will send the clients back to * io threads to handle, and fire an event, the io thread handles the event by * this function. */ voidhandleClientsFromMainThread(struct aeEventLoop *ae, int fd, void *ptr, int mask){ IOThread *t = ptr;
handleEventNotifier(t->pending_clients_notifier); /* Process the clients from main thread. */ processClientsFromMainThread(t); }
/* Processing clients that have finished executing commands from the main thread. * If the client is not binded to the event loop, we should bind it first and * install read handler. If the client still has query buffer, we should process * the input buffer. If the client has pending reply, we just reply to client, * and then install write handler if needed. */ intprocessClientsFromMainThread(IOThread *t){ pthread_mutex_lock(&t->pending_clients_mutex); // 在这里将新的clients移动到自己的线程内部队列中,并后续逐个处理 listJoin(t->processing_clients, t->pending_clients); pthread_mutex_unlock(&t->pending_clients_mutex); size_t processed = listLength(t->processing_clients); if (processed == 0) return0;
listIter li; listNode *ln; listRewind(t->processing_clients, &li); while((ln = listNext(&li))) { client *c = listNodeValue(ln); serverAssert(!(c->io_flags & (CLIENT_IO_READ_ENABLED | CLIENT_IO_WRITE_ENABLED))); /* Main thread must handle clients with CLIENT_CLOSE_ASAP flag, since * we only set io_flags when clients in io thread are freed ASAP. */ serverAssert(!(c->flags & CLIENT_CLOSE_ASAP));
/* Link client in IO thread clients list first. */ serverAssert(c->io_thread_client_list_node == NULL); listUnlinkNode(t->processing_clients, ln); listLinkNodeTail(t->clients, ln); c->io_thread_client_list_node = listLast(t->clients);
/* The client now is in the IO thread, let's free deferred objects. */ freeClientDeferredObjects(c, 0); ...
/* Only bind once, we never remove read handler unless freeing client. */ if (!connHasEventLoop(c->conn)) { connRebindEventLoop(c->conn, t->el); serverAssert(!connHasReadHandler(c->conn)); // 此处绑定client的fd到自己的eventloop,读事件回调为readQueryFromClient connSetReadHandler(c->conn, readQueryFromClient); }
/* If the client has pending replies, write replies to client. */ if (clientHasPendingReplies(c)) { writeToClient(c, 0); if (!(c->io_flags & CLIENT_IO_CLOSE_ASAP) && clientHasPendingReplies(c)) { // 如果写回复没有发完,就把可写事件触发 connSetWriteHandler(c->conn, sendReplyToClient); } } } /* All clients must are processed. */ serverAssert(listLength(t->processing_clients) == 0); return processed; }
if (c->running_tid != IOTHREAD_MAIN_THREAD_ID) { c->io_flags |= CLIENT_IO_PENDING_COMMAND; c->iolookedcmd = lookupCommand(c->argv, c->argc); if (c->iolookedcmd && !commandCheckArity(c->iolookedcmd, c->argc, NULL)) { /* The command was found, but the arity is invalid, reset it and let main * thread handle. To avoid memory prefetching on an invalid command. */ c->iolookedcmd = NULL; } c->slot = getSlotFromCommand(c->iolookedcmd, c->argv, c->argc); enqueuePendingClientsToMainThread(c, 0); break; }
/* When IO threads read a complete query of clients or want to free clients, it * should remove it from its clients list and put the client in the list to main * thread, we will send these clients to main thread in IOThreadBeforeSleep. */ voidenqueuePendingClientsToMainThread(client *c, int unbind){ /* If the IO thread may no longer manage it, such as closing client, we should * unbind client from event loop, so main thread doesn't need to do it costly. */ if (unbind) connUnbindEventLoop(c->conn); /* Just skip if it already is transferred. */ if (c->io_thread_client_list_node) { IOThread *t = &IOThreads[c->tid]; /* If there are several clients to process, let the main thread handle them ASAP. * Since the client being added to the queue may still need to be processed by * the IO thread, we must call this before adding it to the queue to avoid * races with the main thread. */ sendPendingClientsToMainThreadIfNeeded(t, 1); /* Disable read and write to avoid race when main thread processes. */ c->io_flags &= ~(CLIENT_IO_READ_ENABLED | CLIENT_IO_WRITE_ENABLED); /* Remove the client from IO thread, add it to main thread's pending list. */ listUnlinkNode(t->clients, c->io_thread_client_list_node); listLinkNodeTail(t->pending_clients_to_main_thread, c->io_thread_client_list_node); c->io_thread_client_list_node = NULL; } }
/* When the main thread accepts a new client or transfers clients to IO threads, * it assigns the client to the IO thread with the fewest clients. */ voidassignClientToIOThread(client *c){ serverAssert(c->tid == IOTHREAD_MAIN_THREAD_ID); /* Find the IO thread with the fewest clients. */ int min_id = 0; int min = INT_MAX; for (int i = 1; i < server.io_threads_num; i++) { if (server.io_threads_clients_num[i] < min) { min = server.io_threads_clients_num[i]; min_id = i; } }
/* Assign the client to the IO thread. */ server.io_threads_clients_num[c->tid]--; c->tid = min_id; c->running_tid = min_id; server.io_threads_clients_num[min_id]++;
/* The client running in IO thread needs to have deferred objects array. */ c->deferred_objects = zmalloc(sizeof(robj*) * CLIENT_MAX_DEFERRED_OBJECTS);
/* Unbind connection of client from main thread event loop, disable read and * write, and then put it in the list, main thread will send these clients * to IO thread in beforeSleep. */ connUnbindEventLoop(c->conn); c->io_flags &= ~(CLIENT_IO_READ_ENABLED | CLIENT_IO_WRITE_ENABLED); listAddNodeTail(mainThreadPendingClientsToIOThreads[c->tid], c); }
/* Add the pending clients to the list of IO threads, and trigger an event to * notify io threads to handle. */ intsendPendingClientsToIOThreads(void){ int processed = 0; for (int i = 1; i < server.io_threads_num; i++) { int len = listLength(mainThreadPendingClientsToIOThreads[i]); if (len > 0) { IOThread *t = &IOThreads[i]; pthread_mutex_lock(&t->pending_clients_mutex); listJoin(t->pending_clients, mainThreadPendingClientsToIOThreads[i]); pthread_mutex_unlock(&t->pending_clients_mutex); /* Trigger an event, maybe an error is returned when buffer is full * if using pipe, but no worry, io thread will handle all clients * in list when receiving a notification. */ triggerEventNotifier(t->pending_clients_notifier); } processed += len; } return processed; }
/* When the io thread finishes processing the client with the read event, it will * notify the main thread through event triggering in IOThreadBeforeSleep. The main * thread handles the event through this function. */ voidhandleClientsFromIOThread(struct aeEventLoop *el, int fd, void *ptr, int mask){ ... IOThread *t = ptr;
/* The main thread processes the clients from IO threads, these clients may have * a complete command to execute or need to be freed. Note that IO threads never * free client since this operation access much server data. * * Please notice that this function may be called reentrantly, i,e, the same goes * for handleClientsFromIOThread and processClientsOfAllIOThreads. For example, * when processing script command, it may call processEventsWhileBlocked to * process new events, if the clients with fired events from the same io thread, * it may call this function reentrantly. */ intprocessClientsFromIOThread(IOThread *t){ /* Get the list of clients to process. */ pthread_mutex_lock(&mainThreadPendingClientsMutexes[t->id]); listJoin(mainThreadProcessingClients[t->id], mainThreadPendingClients[t->id]); pthread_mutex_unlock(&mainThreadPendingClientsMutexes[t->id]); size_t processed = listLength(mainThreadProcessingClients[t->id]); if (processed == 0) return0; ...
listNode *node = NULL; while (listLength(mainThreadProcessingClients[t->id])) { ... /* Each time we pop up only the first client to process to guarantee * reentrancy safety. */ if (node) zfree(node); node = listFirst(mainThreadProcessingClients[t->id]); listUnlinkNode(mainThreadProcessingClients[t->id], node); client *c = listNodeValue(node);
... /* Let main thread to run it, set running thread id first. */ c->running_tid = IOTHREAD_MAIN_THREAD_ID;
/* If a read error occurs, handle it in the main thread first, since we * want to print logs about client information before freeing. */ if (c->read_error) handleClientReadError(c);
/* The client is asked to close in IO thread. */ if (c->io_flags & CLIENT_IO_CLOSE_ASAP) { freeClient(c); continue; }
...
// 实际的命令处理位置 /* Process the pending command and input buffer. */ if (!c->read_error && c->io_flags & CLIENT_IO_PENDING_COMMAND) { c->flags |= CLIENT_PENDING_COMMAND; if (processPendingCommandAndInputBuffer(c) == C_ERR) { /* If the client is no longer valid, it must be freed safely. */ continue; } }
/* We may have pending replies if io thread may not finish writing * reply to client, so we did not put the client in pending write * queue. And we should do that first since we may keep the client * in main thread instead of returning to io threads. */ if (!(c->flags & CLIENT_PENDING_WRITE) && clientHasPendingReplies(c)) putClientInPendingWriteQueue(c);
...
/* Remove this client from pending write clients queue of main thread, * And some clients may do not have reply if CLIENT REPLY OFF/SKIP. */ if (c->flags & CLIENT_PENDING_WRITE) { c->flags &= ~CLIENT_PENDING_WRITE; listUnlinkNode(server.clients_pending_write, &c->clients_pending_write_node); } c->running_tid = c->tid; listLinkNodeHead(mainThreadPendingClientsToIOThreads[c->tid], node); node = NULL; /* If there are several clients to process, let io thread handle them ASAP. */ // 可以看到并不是全部的client处理完后再让io线程工作,而是处理了一部分就转移一部份,尽可能实现全并行。 sendPendingClientsToIOThreadIfNeeded(t, 1); } if (node) zfree(node);
/* Send the clients to io thread without pending size check, since main thread * may process clients from other io threads, so we need to send them to the * io thread to process in prallel. */ sendPendingClientsToIOThreadIfNeeded(t, 0);