libuv实现了一个线程池,该线程池在用户提交了第一个任务的时候初始化,而不是系统启动的时候就初始化。入口代码如下。
static void init_once(void) { #ifndef _WIN32 /* Re-initialize the threadpool after fork. * Note that this discards the global mutex and condition as well * as the work queue. */ if (pthread_atfork(NULL, NULL, &reset_once)) abort(); #endif init_threads(); } // 给线程池提交一个任务 void uv__work_submit(uv_loop_t* loop, struct uv__work* w, enum uv__work_kind kind, void (*work)(struct uv__work* w), void (*done)(struct uv__work* w, int status)) { // 保证已经初始化线程,并只执行一次 uv_once(&once, init_once); w->loop = loop; w->work = work; w->done = done; post(&w->wq, kind); }
下面我们直接看一下线程池的初始化函数。
static void init_threads(void) { unsigned int i; const char* val; uv_sem_t sem; nthreads = ARRAY_SIZE(default_threads); val = getenv("UV_THREADPOOL_SIZE"); if (val != NULL) nthreads = atoi(val); if (nthreads == 0) nthreads = 1; if (nthreads > MAX_THREADPOOL_SIZE) nthreads = MAX_THREADPOOL_SIZE; threads = default_threads; if (nthreads > ARRAY_SIZE(default_threads)) { threads = uv__malloc(nthreads * sizeof(threads[0])); if (threads == NULL) { nthreads = ARRAY_SIZE(default_threads); threads = default_threads; } } // 初始化条件变量 if (uv_cond_init(&cond)) abort(); // 初始化互斥变量 if (uv_mutex_init(&mutex)) abort(); // 初始化三个队列 QUEUE_INIT(&wq); QUEUE_INIT(&slow_io_pending_wq); QUEUE_INIT(&run_slow_work_message); // 初始化信号量变量,值为0 if (uv_sem_init(&sem, 0)) abort(); // 创建多个线程 for (i = 0; i < nthreads; i++) if (uv_thread_create(threads + i, worker, &sem)) abort(); // 等待sem信号量为非0的时候减去一,为0则阻塞 for (i = 0; i < nthreads; i++) uv_sem_wait(&sem); uv_sem_destroy(&sem); }
大致上就是初始化各种变量,根据配置创建多个线程,每个线程的工作函数是worker。到这,就完成了线程池的创建,接下来我们看一下如何给线程池提交一个任务。有两种方式,libuv内部使用的是uv__work_submit函数。
// 给线程池提交一个任务 void uv__work_submit(uv_loop_t* loop, struct uv__work* w, enum uv__work_kind kind, void (*work)(struct uv__work* w), void (*done)(struct uv__work* w, int status)) { // 保证已经初始化线程,并只执行一次 uv_once(&once, init_once); w->loop = loop; w->work = work; w->done = done; post(&w->wq, kind); }
用户使用的是uv_queue_work
int uv_queue_work(uv_loop_t* loop, uv_work_t* req, uv_work_cb work_cb, uv_after_work_cb after_work_cb) { if (work_cb == NULL) return UV_EINVAL; uv__req_init(loop, req, UV_WORK); req->loop = loop; req->work_cb = work_cb; req->after_work_cb = after_work_cb; uv__work_submit(loop, &req->work_req, UV__WORK_CPU, uv__queue_work, uv__queue_done); return 0; }
不过这两种方式区别不大,只是做了些封装。可以这两种方式最后都是通过post函数进行提交任务的。所以我们继续看post的代码。
// 把任务插入队列等待线程处理 static void post(QUEUE* q, enum uv__work_kind kind) { uv_mutex_lock(&mutex); // 类型是慢IO if (kind == UV__WORK_SLOW_IO) { /* Insert into a separate queue. */ // 插入慢IO对应的队列 QUEUE_INSERT_TAIL(&slow_io_pending_wq, q); /* 有慢IO任务的时候,需要给主队列wq插入一个消息节点run_slow_work_message, 说明有慢IO任务,所以如果run_slow_work_message是空,说明还没有插入主队列。 需要进行q = &run_slow_work_message;赋值,然后把run_slow_work_message插入 主队列 */ if (!QUEUE_EMPTY(&run_slow_work_message)) { /* Running slow I/O tasks is already scheduled => Nothing to do here. The worker that runs said other task will schedule this one as well. */ uv_mutex_unlock(&mutex); return; } q = &run_slow_work_message; } // 把节点插入主队列,可能是慢IO消息节点或者一般任务 QUEUE_INSERT_TAIL(&wq, q); // 有空闲线程则唤醒他 if (idle_threads > 0) uv_cond_signal(&cond); uv_mutex_unlock(&mutex); }
post根据任务的类型,把节点插入到相应的队列。到这里,就完成了任务的提交,接下来就是等待任务的执行和完成。每个线程的工作函数是worker。我们看看代码。
static void worker(void* arg) { struct uv__work* w; QUEUE* q; int is_slow_work; uv_sem_post((uv_sem_t*) arg); arg = NULL; uv_mutex_lock(&mutex); for (;;) { /* `mutex` should always be locked at this point. */ /* Keep waiting while either no work is present or only slow I/O and we're at the threshold for that. */ /* 如果队列为空,或者不为空,但是队列里只有慢IO任务且正在执行的慢IO任务个数达到阈值, 则空闲函数加一,等待费慢IO任务,防止慢IO占用资源线程,导致其他快的任务无法得到执行 */ while (QUEUE_EMPTY(&wq) || (QUEUE_HEAD(&wq) == &run_slow_work_message && QUEUE_NEXT(&run_slow_work_message) == &wq && slow_io_work_running >= slow_work_thread_threshold())) { idle_threads += 1; // 阻塞,等待唤醒 uv_cond_wait(&cond, &mutex); idle_threads -= 1; } // 取出头结点,头指点可能是退出消息、慢IO,一般请求 q = QUEUE_HEAD(&wq); // 如果头结点是退出消息,则结束线程 if (q == &exit_message) { uv_cond_signal(&cond); uv_mutex_unlock(&mutex); break; } // 移除节点 QUEUE_REMOVE(q); QUEUE_INIT(q); /* Signal uv_cancel() that the work req is executing. */ is_slow_work = 0; // 如果当前节点等于慢IO节点 if (q == &run_slow_work_message) { /* If we're at the slow I/O threshold, re-schedule until after all other work in the queue is done. */ // 遇到阈值,重新入队 if (slow_io_work_running >= slow_work_thread_threshold()) { QUEUE_INSERT_TAIL(&wq, q); continue; } /* If we encountered a request to run slow I/O work but there is none to run, that means it's cancelled => Start over. */ // 没有慢IO任务则继续 if (QUEUE_EMPTY(&slow_io_pending_wq)) continue; // 处理慢IO任务 is_slow_work = 1; // 正在处理慢IO任务的个数累加,用于其他线程判断慢IO任务个数是否达到阈值 slow_io_work_running++; // 取出任务 q = QUEUE_HEAD(&slow_io_pending_wq); QUEUE_REMOVE(q); QUEUE_INIT(q); /* If there is more slow I/O work, schedule it to be run as well. */ // 取出一个任务后,如果还有满IO任务则把慢IO标记节点重新入队,表示还有满IO任务,因为上面把该标记节点出队了 if (!QUEUE_EMPTY(&slow_io_pending_wq)) { QUEUE_INSERT_TAIL(&wq, &run_slow_work_message); // 有空闲线程则唤醒他,因为还有任务处理 if (idle_threads > 0) uv_cond_signal(&cond); } } uv_mutex_unlock(&mutex); // q是慢IO或者一般任务 w = QUEUE_DATA(q, struct uv__work, wq); // 执行业务回调,该函数一般会阻塞 w->work(w); uv_mutex_lock(&w->loop->wq_mutex); w->work = NULL; /* Signal uv_cancel() that the work req is done executing. */ // 执行完任务,插入到loop的wq队列,在uv__work_done的时候会执行该队列的节点 QUEUE_INSERT_TAIL(&w->loop->wq, &w->wq); // 通知loop的wq_async节点 uv_async_send(&w->loop->wq_async); uv_mutex_unlock(&w->loop->wq_mutex); /* Lock `mutex` since that is expected at the start of the next * iteration. */ uv_mutex_lock(&mutex); // 执行完满IO任务,记录正在执行的慢IO个数变量减1 if (is_slow_work) { /* `slow_io_work_running` is protected by `mutex`. */ slow_io_work_running--; } } }
worker函数就是死循环从队列里取出任务执行,执行完之后通知主线程。还有一些关于慢IO任务的优化。因为提交任务有两种方式,所以执行任务时对应的函数也不一样。如果是用户通过uv_queue_work提交的,对应的的执行函数就是。
static void uv__queue_work(struct uv__work* w) { uv_work_t* req = container_of(w, uv_work_t, work_req); req->work_cb(req); }
其实也只是做了层封装,wrok_cb对应的就是用户设置的函数。工作函数一般是阻塞的,所以会导致线程的阻塞,这就是线程池的意义。一个线程挂起,另一个可以继续执行任务。等待阻塞返回时,线程会通知主线程。重点work函数里的这两句代码。
// 执行完任务,插入到loop的wq队列,在uv__work_done的时候会执行该队列的节点 QUEUE_INSERT_TAIL(&w->loop->wq, &w->wq); // 通知loop的wq_async节点 uv_async_send(&w->loop->wq_async);
我们看一下uv_async_send的代码。
int uv_async_send(uv_async_t* handle) { /* Do a cheap read first. */ if (ACCESS_ONCE(int, handle->pending) != 0) return 0; // 如pending是0,则设置为1,返回0,如果是1则返回1,所以如果多次调用该函数是会被合并的 if (cmpxchgi(&handle->pending, 0, 1) == 0) uv__async_send(handle->loop); return 0; } static void uv__async_send(uv_loop_t* loop) { const void* buf; ssize_t len; int fd; int r; buf = ""; len = 1; fd = loop->async_wfd; #if defined(__linux__) // 说明用的是eventfd而不是管道 if (fd == -1) { static const uint64_t val = 1; buf = &val; len = sizeof(val); // 见uv__async_start fd = loop->async_io_watcher.fd; /* eventfd */ } #endif // 通知读端 do r = write(fd, buf, len); while (r == -1 && errno == EINTR); if (r == len) return; if (r == -1) if (errno == EAGAIN || errno == EWOULDBLOCK) return; abort(); }
子线程给主线程的wq_async对应的IO观察者发送消息。然后在poll IO阶段,epoll_wai就会监听到这个事件,从而执行回调函数。由uv_loop_init初始化时代码可知,回调函数是uv__work_done。
uv_loop_init函数: err = uv_async_init(loop, &loop->wq_async, uv__work_done); void uv__work_done(uv_async_t* handle) { struct uv__work* w; uv_loop_t* loop; QUEUE* q; QUEUE wq; int err; loop = container_of(handle, uv_loop_t, wq_async); uv_mutex_lock(&loop->wq_mutex); // 把loop->wq队列的节点全部移到wp变量中,wq的队列在线程处理函数work里进行设置 QUEUE_MOVE(&loop->wq, &wq); uv_mutex_unlock(&loop->wq_mutex); while (!QUEUE_EMPTY(&wq)) { q = QUEUE_HEAD(&wq); QUEUE_REMOVE(q); w = container_of(q, struct uv__work, wq); err = (w->work == uv__cancelled) ? UV_ECANCELED : 0; // 执行回调 w->done(w, err); } }
这里又和提交任务的方式有关,如果是用户通过uv_queue_work提交的任务,这时候的回调uv__queue_done。该函数也是做了一些封装。
static void uv__queue_done(struct uv__work* w, int err) { uv_work_t* req; req = container_of(w, uv_work_t, work_req); uv__req_unregister(req->loop, req); if (req->after_work_cb == NULL) return; // 用户设置的回调 req->after_work_cb(req, err); }
如果是libuv本身提交的任务。则回调函数即libuv设置的,没有经过封装。最后提一下libuv提供的取消任务函数。
static int uv__work_cancel(uv_loop_t* loop, uv_req_t* req, struct uv__work* w) { int cancelled; // 加锁,为了把节点移出队列 uv_mutex_lock(&mutex); // 加锁,为了判断w->wq是否为空 uv_mutex_lock(&w->loop->wq_mutex); // w在一个队列中并work不为空,则可取消 cancelled = !QUEUE_EMPTY(&w->wq) && w->work != NULL; // 删除该节点 if (cancelled) QUEUE_REMOVE(&w->wq); uv_mutex_unlock(&w->loop->wq_mutex); uv_mutex_unlock(&mutex); if (!cancelled) return UV_EBUSY; // 重置回调函数 w->work = uv__cancelled; uv_mutex_lock(&loop->wq_mutex); // 插入loop的wq队列 QUEUE_INSERT_TAIL(&loop->wq, &w->wq); // 通知主线程执行任务回调 uv_async_send(&loop->wq_async); uv_mutex_unlock(&loop->wq_mutex); return 0; }
到这,libuv的线程池就解析完了。
posted on 2019-10-14 15:46
长戟十三千 阅读(777)
评论(0) 编辑 收藏 引用 所属分类:
libuv