From 49839c88a98d3798f7b18c58f54f26f36cacff38 Mon Sep 17 00:00:00 2001 From: Guangxiong Lin Date: Fri, 9 Dec 2022 16:46:49 +0800 Subject: Implement a simple thread pool and refactor Refactor --- tpool.c | 186 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 186 insertions(+) create mode 100644 tpool.c (limited to 'tpool.c') diff --git a/tpool.c b/tpool.c new file mode 100644 index 0000000..1aa5986 --- /dev/null +++ b/tpool.c @@ -0,0 +1,186 @@ +#include +#include +#include "tpool.h" + +struct tpool_work { + thread_func_t func; + void *arg; + struct tpool_work *next; +}; +typedef struct tpool_work tpool_work_t; + +tpool_t *tpool; + +static tpool_work_t *tpool_work_create(thread_func_t func, void *arg) +{ + tpool_work_t *work; + + if (func == NULL) + return NULL; + + work = (tpool_work_t *)malloc(sizeof(*work)); + work->func = func; + work->arg = arg; + work->next = NULL; + return work; +} + +static void tpool_work_destroy(tpool_work_t *work) +{ + if (work == NULL) + return; + + free(work); +} + +struct tpool { + tpool_work_t *work_first; + tpool_work_t *work_last; + pthread_mutex_t work_mutex; + pthread_cond_t work_cond; + pthread_cond_t working_cond; + size_t working_cnt; + size_t thread_cnt; + bool stop; +}; + +static tpool_work_t *tpool_pop(tpool_t *tp) +{ + if (tp == NULL) + return NULL; + + tpool_work_t *work = tp->work_first; + if (work == NULL) + return NULL; + + tp->work_first = work->next; + if (work == tp->work_last) + tp->work_last = NULL; + + return work; +} + +static void *tpool_worker(void *arg) +{ + tpool_t *tp = (tpool_t *)arg; + tpool_work_t *work; + + for (;;) { + pthread_mutex_lock(&tp->work_mutex); + + while (tp->work_first == NULL && !tp->stop) + pthread_cond_wait(&tp->work_cond, &tp->work_mutex); + + if (tp->stop) + break; + + work = tpool_pop(tp); + tp->working_cnt++; + pthread_mutex_unlock(&tp->work_mutex); + + if (work != NULL) { + work->func(work->arg); + tpool_work_destroy(work); + } + + pthread_mutex_lock(&tp->work_mutex); + tp->working_cnt--; + if (!tp->stop && tp->working_cnt == 0 && tp->work_first == NULL) + pthread_cond_signal(&tp->working_cond); + pthread_mutex_unlock(&tp->work_mutex); + } + + tp->thread_cnt--; + pthread_cond_signal(&tp->working_cond); + pthread_mutex_unlock(&tp->work_mutex); + + return NULL; +} + +tpool_t *tpool_create(size_t num) +{ + tpool_t *tp; + pthread_t thread; + + if (num == 0) + num = 12; + + tp = (tpool_t *)malloc(sizeof(*tp)); + tp->thread_cnt = num; + tp->work_first = tp->work_last = NULL; + + pthread_mutex_init(&tp->work_mutex, NULL); + pthread_cond_init(&tp->work_cond, NULL); + pthread_cond_init(&tp->working_cond, NULL); + + for (int i = 0; i < num; i++) { + pthread_create(&thread, NULL, tpool_worker, tp); + pthread_detach(thread); + } + + return tp; +} + +void tpool_destroy(tpool_t *tp) +{ + tpool_work_t *work, *next_work; + + if (tp == NULL) + return; + + pthread_mutex_lock(&tp->work_mutex); + work = tp->work_first; + while (work != NULL) { + next_work = work->next; + tpool_work_destroy(work); + work = next_work; + } + tp->stop = true; + + pthread_cond_broadcast(&tp->work_cond); + pthread_mutex_unlock(&tp->work_mutex); + + tpool_wait(tp); + + pthread_mutex_destroy(&tp->work_mutex); + pthread_cond_destroy(&tp->work_cond); + pthread_cond_destroy(&tp->working_cond); + + free(tp); +} + +bool tpool_add_work(tpool_t *tp, thread_func_t func, void *arg) +{ + tpool_work_t *work; + + if (tp == NULL) + return false; + + work = tpool_work_create(func, arg); + if (work == NULL) + return false; + + pthread_mutex_lock(&tp->work_mutex); + if (!tp->work_first) + tp->work_first = tp->work_last = work; + else + tp->work_last = tp->work_last->next = work; + + pthread_cond_broadcast(&tp->work_cond); + pthread_mutex_unlock(&tp->work_mutex); + + return true; +} + +void tpool_wait(tpool_t *tp) +{ + if (tp == NULL) + return; + + pthread_mutex_lock(&tp->work_mutex); + + while ((!tp->stop && tp->working_cnt != 0) || (tp->stop && tp->thread_cnt != 0)) + pthread_cond_wait(&tp->working_cond, &tp->work_mutex); + + pthread_mutex_unlock(&tp->work_mutex); +} -- cgit v1.2.3