workflow源码分析:线程池

搜狗的开源项目中有一个非常重要的基础模块:线程池,代码只有300行代码

简介

Workflow的大招:计算通信融为一体的异步调度模式,而计算的核心:Executor调度器,就是基于这个线程池实现的。可以说,一个通用而高效的线程池,是我们写C/C++代码时离不开的基础模块。

thrdpool代码位置在src/kernel/,不仅可以直接拿来使用,同时也适合阅读学习。

特点:

  1. 创建完线程池后,无需记录任何线程id或对象,线程池可以通过一个等一个的方式优雅地去结束所有线程(线程是对等的)
  2. 线程任务可以由另一个任务调起,线程池正在被销毁的时候也可以提交下一个任务
  3. 线程任务可以销毁这个线程池(行为是对等的)

前置知识

为什么需要线程池?(其实思路不仅对线程池,对任何有限资源的调度管理都是类似的)

我们知道,C/C++常用的pthread或者std::thread就可以创建线程,实现多线程并行执行我们的代码。

但是CPU可以并行的线程数是固定的,所以真正并行执行的最大值也是固定的,过多的线程创建除了频繁产生创建的overhead以外,还会导致对系统资源进行争抢,这些都是不必要的浪费。

因此我们可以管理有限个线程,循环且合理地利用它们。

那么线程池一般包含哪些内容呢?

  • 首先是管理若干个线程;
  • 其次是管理交给线程去执行的任务,这个一般会是一个队列;
  • 再然后线程之间需要一些同步机制,比如mutex、condition等;
  • 最后就是各线程池实现上自身需要的其他内容了;

代码概览

  • 头文件,看看模块提供的接口
1
2
3
4
5
6
7
// 创建线程池
thrdpool_t *thrdpool_create(size_t nthreads, size_t stacksize);
// 把任务交给线程池的入口
int thrdpool_schedule(const struct thrdpool_task *task, thrdpool_t *pool);
// 销毁线程池
void thrdpool_destroy(void (*pending)(const struct thrdpool_task *),
thrdpool_t *pool);

我们先观察这三个接口:创建线程池、将任务交给线程池和销毁线程池

  • 接口上的数据结构

    如何描述一个交给线程池的任务:

    1
    2
    3
    4
    5
    struct thrdpool_task                                                            
    {
    void (*routine)(void *); // 一个函数指针
    void *context; // 一个上下文
    };
  • 看一下.c文件 观察他的内部数据结构

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    struct __thrdpool
    {
    struct list_head task_queue; // 任务队列
    size_t nthreads; // 线程个数
    size_t stacksize; // 构造线程时的参数
    pthread_t tid; // 运行起来之后,pool上记录的这个是zero值
    pthread_mutex_t mutex;
    pthread_cond_t cond;
    pthread_key_t key;
    pthread_cond_t *terminate;
    };
    • tid:线程id,只有一个,不用记录任何一个线程的id,平时运行的时候是空值,退出的时候它是用来实现链式等待的关键
    • mutex和cond,常见的线程间同步的工具
    • key:线程池的key,赋予给每个由线程池创建的线程,用于区分这个线程是否是线程池创建的
    • pthread_cond_t *terminate:1. 退出时的标记位 2. 调用退出的那个人要等待的condition
  • 接口调用的核心函数

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    thrdpool_t *thrdpool_create(size_t nthreads, size_t stacksize)
    {
    thrdpool_t *pool;
    int ret;

    pool = (struct __thrdpool *)malloc(sizeof (struct __thrdpool));
    if (pool)
    {
    if (__thrdpool_init_locks(pool) >= 0)
    {
    ret = pthread_key_create(&pool->key, NULL);
    if (ret == 0)
    {
    INIT_LIST_HEAD(&pool->task_queue);
    pool->stacksize = stacksize;
    pool->nthreads = 0;
    memset(&pool->tid, 0, sizeof (pthread_t));
    pool->terminate = NULL;
    if (__thrdpool_create_threads(nthreads, pool) >= 0)
    return pool;

    pthread_key_delete(pool->key);
    }
    else
    errno = ret;

    __thrdpool_destroy_locks(pool);
    }

    free(pool);
    }

    return NULL;
    }
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    static int __thrdpool_create_threads(size_t nthreads, thrdpool_t *pool)
    {
    pthread_attr_t attr;
    pthread_t tid;
    int ret;

    ret = pthread_attr_init(&attr);
    if (ret == 0)
    {
    if (pool->stacksize)
    pthread_attr_setstacksize(&attr, pool->stacksize);

    while (pool->nthreads < nthreads)
    {
    ret = pthread_create(&tid, &attr, __thrdpool_routine, pool);
    if (ret == 0)
    pool->nthreads++;
    else
    break;
    }

    pthread_attr_destroy(&attr);
    if (pool->nthreads == nthreads)
    return 0;

    __thrdpool_terminate(pool);
    }

    errno = ret;
    return -1;
    }
  • 核心函数的功能

    每个线程执行的是__thrdpool_routine()。不难想象,它会不停从队列拿任务出来执行

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    static void *__thrdpool_routine(void *arg)
    {
    thrdpool_t *pool = (thrdpool_t *)arg;
    struct list_head **pos = &pool->task_queue.next;
    struct __thrdpool_task_entry *entry;
    pthread_t tid;

    pthread_setspecific(pool->key, pool);
    while (1)
    {
    //从队列里拿一个任务出来,没有就等待
    pthread_mutex_lock(&pool->mutex);
    while (!pool->terminate && list_empty(&pool->task_queue))
    pthread_cond_wait(&pool->cond, &pool->mutex);

    if (pool->terminate) //线程池结束的标志位
    break;
    //拿到了任务
    entry = list_entry(*pos, struct __thrdpool_task_entry, list);
    list_del(*pos);
    pthread_mutex_unlock(&pool->mutex);
    //执行
    entry->task.routine(entry->task.context);
    free(entry);
    }

    tid = pool->tid;
    pool->tid = pthread_self();
    if (--pool->nthreads == 0)
    pthread_cond_signal(pool->terminate);

    pthread_mutex_unlock(&pool->mutex);
    if (memcmp(&tid, &__zero_tid, sizeof (pthread_t)) != 0)
    pthread_join(tid, NULL);

    return NULL;
    }
  • 函数之间的关系联系起来

    __thrdpool_routine()就是线程的核心函数了,它可以和接口thrdpool_schedule()关联上。

    我们说过,线程池上有个队列管理任务:

    • 所以,每个执行routine的线程,都是消费者;

    • 而每个发起schedule的线程,都是生产者;

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      inline void __thrdpool_schedule(const struct thrdpool_task *task, void *buf, 
      thrdpool_t *pool)
      {
      struct __thrdpool_task_entry *entry = (struct __thrdpool_task_entry *)buf;
      entry->task = *task;
      pthread_mutex_lock(&pool->mutex);
      list_add_tail(&entry->list, &pool->task_queue); // 添加到队列里
      pthread_cond_signal(&pool->cond); // 叫醒在等待的线程
      pthread_mutex_unlock(&pool->mutex);
      }
  • 销毁

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    void thrdpool_destroy(void (*pending)(const struct thrdpool_task *),
    thrdpool_t *pool)
    {
    struct __thrdpool_task_entry *entry;
    struct list_head *pos, *tmp;

    __thrdpool_terminate(pool);
    list_for_each_safe(pos, tmp, &pool->task_queue)
    {
    entry = list_entry(pos, struct __thrdpool_task_entry, list);
    list_del(pos);
    if (pending)
    pending(&entry->task);

    free(entry);
    }

    pthread_key_delete(pool->key);
    __thrdpool_destroy_locks(pool);
    free(pool);
    }

    在退出的时候,我们那些已经提交但是还没有被执行的任务是绝对不能就这么扔掉了的,于是我们可以传入一个pending函数,上层可以做自己的回收、回调、任何保证上层逻辑完备的事情

一个接着一个优雅退出

这里提出一个问题:线程池要退出,如何结束所有线程

一般线程池的实现都是需要记录下所有的线程id,或者thread对象,以便于我们去join等待它们结束。

而线性地退出,一环扣一环,长度本身不重要,让事情可以递归起来,是非常符合计算机世界的常规做法的。

但是我们刚才看,pool里并没有记录所有的tid呀?正如开篇说的,pool上只有一个tid,而且还是个空的值

所以特点1给出了Workflowthrdpool的答案:

无需记录所有线程,我可以让线程挨个自动退出、且一个等待一个,最终达到我调用完thrdpool_destroy()后内存可以回收干净的目的。

v2-f598b30be24b565685092d1bf96b5d79_720w

步骤如下:

  1. 线程的退出,由thrdpool_destroy()设置pool->terminate开始。
  2. 我们每个线程,在while(1)里会第一时间发现terminate,线程池要退出了,然后会break出这个while循环。
  3. 注意这个时候,还持有着mutex锁,我们拿出pool上唯一的那个tid,放到我的临时变量,我会根据拿出来的值做不同的处理。且我会把我自己的tid放上去,然后再解mutex锁。
  4. 那么很显然,第一个从pool上拿tid的人,会发现这是个0值,就可以直接结束了,不用负责等待任何其他人,但我在完全结束之前需要有人负责等待我的结束,所以我会把我的id放上去。
  5. 而如果发现自己从pool里拿到的tid不是0值,说明我要负责join上一个人,并且把我的tid放上去,让下一个人负责我。
  6. 最后的那个人,是那个发现pool->nthreads为0的人,那么我就可以通过这个terminate(它本身是个condition)去通知发起destroy的人。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
static void *__thrdpool_routine(void *arg) 
{
while (1)
{
pthread_mutex_lock(&pool->mutex); // 1.注意这里还持有锁
// 等着队列拿任务出来
if (pool->terminate) // 2. 这既是标识位,也是发起销毁的那个人所等待的condition
break;
// 执行拿到的任务
}

/* One thread joins another. Don't need to keep all thread IDs. */
tid = pool->tid; // 3. 把线程池上记录的那个tid拿下来,我来负责上一人
pool->tid = pthread_self(); // 4. 把我自己记录到线程池上,下一个人来负责我
if (--pool->nthreads == 0) // 5. 每个人都减1,最后一个人负责叫醒发起detroy的人
pthread_cond_signal(pool->terminate);

pthread_mutex_unlock(&pool->mutex); // 6. 这里可以解锁进行等待了
if (memcmp(&tid, &__zero_tid, sizeof (pthread_t)) != 0) // 7. 只有第一个人拿到0
pthread_join(tid, NULL); // 8. 只要不是0值,我就要负责等上一个结束才能退

return NULL; // 9. 退出,干干净净~
}

线程任务可以由另一个线程任务调起

在第二部分我们看过源码,只要队列管理得好,线程任务里提交下一个任务是完全合理,也是非常重要的。

重要性在哪里呢?

由于在发起任务时,我们是无法知道线程池的状况的,但我们必须严格保证“提交任务”的接口语义,即提交给线程池的就必须保证可以由线程池负责,这对上层进行二次封装的生命周期管理非常重要。

所以我们才会引申到特点2的另一个解读:线程池被销毁时也可以提交下一个任务,必须强调的是,是指在线程任务里。而且刚才提过,还没有被执行的任务,可以通过我们传入的pending()函数拿回来。

而更进一步,在线程任务里也可以销毁只是为了让逻辑完整的特点,我们后放到特点3里说。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
static void __thrdpool_terminate(int in_pool, thrdpool_t *pool) 
{
pthread_cond_t term = PTHREAD_COND_INITIALIZER;

pthread_mutex_lock(&pool->mutex); // 1. 加锁设置标识位
pool->terminate = &term; // 2. 之后的添加任务不会被执行,但可以pending拿到
pthread_cond_broadcast(&pool->cond); // 3. 广播所有等待的消费者

if (in_pool) // 4. 这里的魔法等下讲>_<~
{
/* Thread pool destroyed in a pool thread is legal. */
pthread_detach(pthread_self());
pool->nthreads--;
}

while (pool->nthreads > 0) // 5. 如果还有线程没有退完,我会等,注意这里是while
pthread_cond_wait(&term, &pool->mutex);

pthread_mutex_unlock(&pool->mutex);
if (memcmp(&pool->tid, &__zero_tid, sizeof (pthread_t)) != 0)
pthread_join(pool->tid, NULL); // 6. 同样地等待打算退出的上一个人
}

同样可以在线程任务里销毁线程池

既然线程任务可以做任何事情,理论上,线程任务也可以销毁线程池

作为一个逻辑完备的线程池,大胆一点,我们把问号去掉。

而且,销毁并不会结束当前任务,它会等这个任务执行完。

想象一下,刚才的__thrdpool_routine()while里拿出来的那个任务,做的事情竟然是发起thrdpool_destroy()

我们来把上面的图改一下:

v2-caf52b89ad117d49e7d2adf54c238c28_720w

简单的使用

这个线程池只有两个文件: thrdpool.hthrdpool.c,而且只依赖内核的数据结构list.h。我们把它拿出来玩,自己写一段代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
void my_routine(void *context) // 我们要执行的函数  
{
printf("task-%llu start.\n", reinterpret_cast<unsigned long long>(context));
}

void my_pending(const struct thrdpool_task *task) // 线程池销毁后,没执行的任务会到这里
{
printf("pending task-%llu.\n", reinterpret_cast<unsigned long long>(task->context));
}

int main()
{
thrdpool_t *thrd_pool = thrdpool_create(3, 1024); // 创建
struct thrdpool_task task;
unsigned long long i;

for (i = 0; i < 5; i++)
{
task.routine = &my_routine;
task.context = reinterpret_cast<void *>(i);
thrdpool_schedule(&task, thrd_pool); // 调用
}
getchar(); // 卡住主线程,按回车继续
thrdpool_destroy(&my_pending, thrd_pool); // 结束
return 0;
}