随笔 - 137  文章 - 1  trackbacks - 0
<2020年1月>
2930311234
567891011
12131415161718
19202122232425
2627282930311
2345678

常用链接

留言簿

随笔分类

随笔档案

收藏夹

调试技巧

搜索

  •  

最新评论

阅读排行榜

评论排行榜

libuv实现了一个线程池,该线程池在用户提交了第一个任务的时候初始化,而不是系统启动的时候就初始化。入口代码如下。

static void init_once(void) { #ifndef _WIN32   /* Re-initialize the threadpool after fork.    * Note that this discards the global mutex and condition as well    * as the work queue.    */   if (pthread_atfork(NULL, NULL, &reset_once))     abort(); #endif   init_threads(); }  // 给线程池提交一个任务 void uv__work_submit(uv_loop_t* loop,                      struct uv__work* w,                      enum uv__work_kind kind,                      void (*work)(struct uv__work* w),                      void (*done)(struct uv__work* w, int status)) {   // 保证已经初始化线程,并只执行一次   uv_once(&once, init_once);   w->loop = loop;   w->work = work;   w->done = done;   post(&w->wq, kind); }

下面我们直接看一下线程池的初始化函数。

static void init_threads(void) {   unsigned int i;   const char* val;   uv_sem_t sem;    nthreads = ARRAY_SIZE(default_threads);   val = getenv("UV_THREADPOOL_SIZE");   if (val != NULL)     nthreads = atoi(val);   if (nthreads == 0)     nthreads = 1;   if (nthreads > MAX_THREADPOOL_SIZE)     nthreads = MAX_THREADPOOL_SIZE;    threads = default_threads;   if (nthreads > ARRAY_SIZE(default_threads)) {     threads = uv__malloc(nthreads * sizeof(threads[0]));     if (threads == NULL) {       nthreads = ARRAY_SIZE(default_threads);       threads = default_threads;     }   }   // 初始化条件变量   if (uv_cond_init(&cond))     abort();   // 初始化互斥变量   if (uv_mutex_init(&mutex))     abort();    // 初始化三个队列   QUEUE_INIT(&wq);   QUEUE_INIT(&slow_io_pending_wq);   QUEUE_INIT(&run_slow_work_message);    // 初始化信号量变量,值为0   if (uv_sem_init(&sem, 0))     abort();   // 创建多个线程   for (i = 0; i < nthreads; i++)     if (uv_thread_create(threads + i, worker, &sem))       abort();   // 等待sem信号量为非0的时候减去一,为0则阻塞   for (i = 0; i < nthreads; i++)     uv_sem_wait(&sem);    uv_sem_destroy(&sem); }

大致上就是初始化各种变量,根据配置创建多个线程,每个线程的工作函数是worker。到这,就完成了线程池的创建,接下来我们看一下如何给线程池提交一个任务。有两种方式,libuv内部使用的是uv__work_submit函数。

// 给线程池提交一个任务 void uv__work_submit(uv_loop_t* loop,                      struct uv__work* w,                      enum uv__work_kind kind,                      void (*work)(struct uv__work* w),                      void (*done)(struct uv__work* w, int status)) {   // 保证已经初始化线程,并只执行一次   uv_once(&once, init_once);   w->loop = loop;   w->work = work;   w->done = done;   post(&w->wq, kind); }

用户使用的是uv_queue_work

int uv_queue_work(uv_loop_t* loop,                   uv_work_t* req,                   uv_work_cb work_cb,                   uv_after_work_cb after_work_cb) {   if (work_cb == NULL)     return UV_EINVAL;    uv__req_init(loop, req, UV_WORK);   req->loop = loop;   req->work_cb = work_cb;   req->after_work_cb = after_work_cb;   uv__work_submit(loop,                   &req->work_req,                   UV__WORK_CPU,                   uv__queue_work,                   uv__queue_done);   return 0; }

不过这两种方式区别不大,只是做了些封装。可以这两种方式最后都是通过post函数进行提交任务的。所以我们继续看post的代码。

// 把任务插入队列等待线程处理 static void post(QUEUE* q, enum uv__work_kind kind) {   uv_mutex_lock(&mutex);   // 类型是慢IO   if (kind == UV__WORK_SLOW_IO) {     /* Insert into a separate queue. */     // 插入慢IO对应的队列     QUEUE_INSERT_TAIL(&slow_io_pending_wq, q);     /*       有慢IO任务的时候,需要给主队列wq插入一个消息节点run_slow_work_message,       说明有慢IO任务,所以如果run_slow_work_message是空,说明还没有插入主队列。       需要进行q = &run_slow_work_message;赋值,然后把run_slow_work_message插入       主队列      */     if (!QUEUE_EMPTY(&run_slow_work_message)) {       /* Running slow I/O tasks is already scheduled => Nothing to do here.          The worker that runs said other task will schedule this one as well. */       uv_mutex_unlock(&mutex);       return;     }     q = &run_slow_work_message;   }   // 把节点插入主队列,可能是慢IO消息节点或者一般任务   QUEUE_INSERT_TAIL(&wq, q);   // 有空闲线程则唤醒他   if (idle_threads > 0)     uv_cond_signal(&cond);   uv_mutex_unlock(&mutex); }

post根据任务的类型,把节点插入到相应的队列。到这里,就完成了任务的提交,接下来就是等待任务的执行和完成。每个线程的工作函数是worker。我们看看代码。

static void worker(void* arg) {   struct uv__work* w;   QUEUE* q;   int is_slow_work;    uv_sem_post((uv_sem_t*) arg);   arg = NULL;    uv_mutex_lock(&mutex);   for (;;) {     /* `mutex` should always be locked at this point. */      /* Keep waiting while either no work is present or only slow I/O        and we're at the threshold for that. */     /*       如果队列为空,或者不为空,但是队列里只有慢IO任务且正在执行的慢IO任务个数达到阈值,       则空闲函数加一,等待费慢IO任务,防止慢IO占用资源线程,导致其他快的任务无法得到执行     */     while (QUEUE_EMPTY(&wq) ||            (QUEUE_HEAD(&wq) == &run_slow_work_message &&             QUEUE_NEXT(&run_slow_work_message) == &wq &&             slow_io_work_running >= slow_work_thread_threshold())) {       idle_threads += 1;       // 阻塞,等待唤醒       uv_cond_wait(&cond, &mutex);       idle_threads -= 1;     }     // 取出头结点,头指点可能是退出消息、慢IO,一般请求     q = QUEUE_HEAD(&wq);     // 如果头结点是退出消息,则结束线程     if (q == &exit_message) {       uv_cond_signal(&cond);       uv_mutex_unlock(&mutex);       break;     }     // 移除节点      QUEUE_REMOVE(q);     QUEUE_INIT(q);  /* Signal uv_cancel() that the work req is executing. */      is_slow_work = 0;     // 如果当前节点等于慢IO节点     if (q == &run_slow_work_message) {       /* If we're at the slow I/O threshold, re-schedule until after all          other work in the queue is done. */       // 遇到阈值,重新入队        if (slow_io_work_running >= slow_work_thread_threshold()) {         QUEUE_INSERT_TAIL(&wq, q);         continue;       }        /* If we encountered a request to run slow I/O work but there is none          to run, that means it's cancelled => Start over. */       // 没有慢IO任务则继续       if (QUEUE_EMPTY(&slow_io_pending_wq))         continue;       // 处理慢IO任务       is_slow_work = 1;       // 正在处理慢IO任务的个数累加,用于其他线程判断慢IO任务个数是否达到阈值       slow_io_work_running++;       // 取出任务       q = QUEUE_HEAD(&slow_io_pending_wq);       QUEUE_REMOVE(q);       QUEUE_INIT(q);        /* If there is more slow I/O work, schedule it to be run as well. */       // 取出一个任务后,如果还有满IO任务则把慢IO标记节点重新入队,表示还有满IO任务,因为上面把该标记节点出队了        if (!QUEUE_EMPTY(&slow_io_pending_wq)) {         QUEUE_INSERT_TAIL(&wq, &run_slow_work_message);         // 有空闲线程则唤醒他,因为还有任务处理         if (idle_threads > 0)           uv_cond_signal(&cond);       }     }      uv_mutex_unlock(&mutex);     // q是慢IO或者一般任务     w = QUEUE_DATA(q, struct uv__work, wq);     // 执行业务回调,该函数一般会阻塞     w->work(w);      uv_mutex_lock(&w->loop->wq_mutex);     w->work = NULL;  /* Signal uv_cancel() that the work req is done                         executing. */     // 执行完任务,插入到loop的wq队列,在uv__work_done的时候会执行该队列的节点     QUEUE_INSERT_TAIL(&w->loop->wq, &w->wq);     // 通知loop的wq_async节点     uv_async_send(&w->loop->wq_async);     uv_mutex_unlock(&w->loop->wq_mutex);      /* Lock `mutex` since that is expected at the start of the next      * iteration. */     uv_mutex_lock(&mutex);     // 执行完满IO任务,记录正在执行的慢IO个数变量减1     if (is_slow_work) {       /* `slow_io_work_running` is protected by `mutex`. */       slow_io_work_running--;     }   } }

worker函数就是死循环从队列里取出任务执行,执行完之后通知主线程。还有一些关于慢IO任务的优化。因为提交任务有两种方式,所以执行任务时对应的函数也不一样。如果是用户通过uv_queue_work提交的,对应的的执行函数就是。

static void uv__queue_work(struct uv__work* w) {   uv_work_t* req = container_of(w, uv_work_t, work_req);    req->work_cb(req); }

其实也只是做了层封装,wrok_cb对应的就是用户设置的函数。工作函数一般是阻塞的,所以会导致线程的阻塞,这就是线程池的意义。一个线程挂起,另一个可以继续执行任务。等待阻塞返回时,线程会通知主线程。重点work函数里的这两句代码。

    // 执行完任务,插入到loop的wq队列,在uv__work_done的时候会执行该队列的节点     QUEUE_INSERT_TAIL(&w->loop->wq, &w->wq);     // 通知loop的wq_async节点     uv_async_send(&w->loop->wq_async);

我们看一下uv_async_send的代码。

int uv_async_send(uv_async_t* handle) {   /* Do a cheap read first. */   if (ACCESS_ONCE(int, handle->pending) != 0)     return 0;   // 如pending是0,则设置为1,返回0,如果是1则返回1,所以如果多次调用该函数是会被合并的   if (cmpxchgi(&handle->pending, 0, 1) == 0)     uv__async_send(handle->loop);    return 0; }   static void uv__async_send(uv_loop_t* loop) {   const void* buf;   ssize_t len;   int fd;   int r;    buf = "";   len = 1;   fd = loop->async_wfd;  #if defined(__linux__)   // 说明用的是eventfd而不是管道   if (fd == -1) {     static const uint64_t val = 1;     buf = &val;     len = sizeof(val);     // 见uv__async_start     fd = loop->async_io_watcher.fd;  /* eventfd */   } #endif   // 通知读端   do     r = write(fd, buf, len);   while (r == -1 && errno == EINTR);    if (r == len)     return;    if (r == -1)     if (errno == EAGAIN || errno == EWOULDBLOCK)       return;    abort(); }

子线程给主线程的wq_async对应的IO观察者发送消息。然后在poll IO阶段,epoll_wai就会监听到这个事件,从而执行回调函数。由uv_loop_init初始化时代码可知,回调函数是uv__work_done。

 uv_loop_init函数: err = uv_async_init(loop, &loop->wq_async, uv__work_done);  void uv__work_done(uv_async_t* handle) {   struct uv__work* w;   uv_loop_t* loop;   QUEUE* q;   QUEUE wq;   int err;    loop = container_of(handle, uv_loop_t, wq_async);    uv_mutex_lock(&loop->wq_mutex);   // 把loop->wq队列的节点全部移到wp变量中,wq的队列在线程处理函数work里进行设置   QUEUE_MOVE(&loop->wq, &wq);   uv_mutex_unlock(&loop->wq_mutex);    while (!QUEUE_EMPTY(&wq)) {     q = QUEUE_HEAD(&wq);     QUEUE_REMOVE(q);      w = container_of(q, struct uv__work, wq);     err = (w->work == uv__cancelled) ? UV_ECANCELED : 0;     // 执行回调     w->done(w, err);   } }

这里又和提交任务的方式有关,如果是用户通过uv_queue_work提交的任务,这时候的回调uv__queue_done。该函数也是做了一些封装。

static void uv__queue_done(struct uv__work* w, int err) {   uv_work_t* req;    req = container_of(w, uv_work_t, work_req);   uv__req_unregister(req->loop, req);    if (req->after_work_cb == NULL)     return;   // 用户设置的回调   req->after_work_cb(req, err); }

如果是libuv本身提交的任务。则回调函数即libuv设置的,没有经过封装。最后提一下libuv提供的取消任务函数。

static int uv__work_cancel(uv_loop_t* loop, uv_req_t* req, struct uv__work* w) {   int cancelled;   // 加锁,为了把节点移出队列   uv_mutex_lock(&mutex);   // 加锁,为了判断w->wq是否为空   uv_mutex_lock(&w->loop->wq_mutex);   // w在一个队列中并work不为空,则可取消   cancelled = !QUEUE_EMPTY(&w->wq) && w->work != NULL;   // 删除该节点   if (cancelled)     QUEUE_REMOVE(&w->wq);    uv_mutex_unlock(&w->loop->wq_mutex);   uv_mutex_unlock(&mutex);    if (!cancelled)     return UV_EBUSY;   // 重置回调函数   w->work = uv__cancelled;    uv_mutex_lock(&loop->wq_mutex);    // 插入loop的wq队列   QUEUE_INSERT_TAIL(&loop->wq, &w->wq);   // 通知主线程执行任务回调   uv_async_send(&loop->wq_async);   uv_mutex_unlock(&loop->wq_mutex);    return 0; }

到这,libuv的线程池就解析完了。

posted on 2019-10-14 15:46 长戟十三千 阅读(794) 评论(0)  编辑 收藏 引用 所属分类: libuv

只有注册用户登录后才能发表评论。
网站导航: 博客园   IT新闻   BlogJava   博问   Chat2DB   管理