From 49839c88a98d3798f7b18c58f54f26f36cacff38 Mon Sep 17 00:00:00 2001 From: Guangxiong Lin Date: Fri, 9 Dec 2022 16:46:49 +0800 Subject: Implement a simple thread pool and refactor Refactor --- Makefile | 9 +-- acceptor.c | 39 ++++++++----- acceptor.h | 9 +-- connection.c | 23 +++++--- connection.h | 14 ++--- eventloop.c | 84 --------------------------- eventloop.h | 31 ---------- evloop.c | 89 ++++++++++++++++++++++++++++ evloop.h | 32 ++++++++++ server.c | 22 ++++--- tpool.c | 186 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ tpool.h | 20 +++++++ tsocket.c | 10 ++-- tsocket.h | 11 ++-- 14 files changed, 402 insertions(+), 177 deletions(-) delete mode 100644 eventloop.c delete mode 100644 eventloop.h create mode 100644 evloop.c create mode 100644 evloop.h create mode 100644 tpool.c create mode 100644 tpool.h diff --git a/Makefile b/Makefile index 333935f..bceb62e 100644 --- a/Makefile +++ b/Makefile @@ -1,12 +1,13 @@ CC ?= gcc +CFLAGS ?= -lpthread + +.PHONY: all +all: client server .PHONY: debug debug: CFLAGS += -g -DDEBUG=1 debug: server client -.PHONY: all -all: client server - .PHONY: clean clean: rm -rf client server *.o @@ -17,5 +18,5 @@ clean: client: client.o util.o $(CC) -o $@ $^ -server: server.o util.o eventloop.o tsocket.o acceptor.o connection.o +server: server.o util.o evloop.o tsocket.o acceptor.o connection.o tpool.o $(CC) -o $@ $^ diff --git a/acceptor.c b/acceptor.c index bb4fb88..1f28fcc 100644 --- a/acceptor.c +++ b/acceptor.c @@ -1,51 +1,58 @@ #include #include #include +#include #include "acceptor.h" #include "util.h" #include "connection.h" +#include "evloop.h" -int connAcceptorAccept(struct connAcceptor *ca) +struct conn_acceptor { + struct tsocket *sock; + evloop_t *el; +}; +typedef struct conn_acceptor conn_acceptor_t; + +static void conn_acceptor_accept(conn_acceptor_t *ca) { - struct tsocket *conn_sock = tsocketAccept(ca->sock); + struct tsocket *conn_sock = tsocket_accept(ca->sock); if (conn_sock == NULL) { perror("socket accept"); - return 0; + return; } if (setblocking(conn_sock->fd, false) == -1) { perror("setblocking"); - tsocketDelete(conn_sock); - return 0; + tsocket_destroy(conn_sock); + return; } - struct connection *conn = connectionNew(conn_sock); - struct event *conn_ev = connectionNewEvent(conn); + event_t *conn_ev = connection_create_event(conn_sock); - if (eventLoopAdd(ca->el, conn_ev, EPOLLIN | EPOLLET) == -1) { + if (evloop_add(ca->el, conn_ev, EPOLLIN | EPOLLET) == -1) { perror("eventloop add fd: conn_sock"); - return 0; + return; } printf("New client fd %d, ip: %s, port: %d\n", conn_sock->fd, conn_sock->addr, conn_sock->port); - - return 0; } -void connAcceptorDel(struct connAcceptor *ca) +static void conn_acceptor_destroy(conn_acceptor_t *ca) { - tsocketDelete(ca->sock); + tsocket_destroy(ca->sock); free(ca); } -struct event *connAcceptorNewEvent(struct tsocket *sock, struct eventLoop *el) +event_t *conn_acceptor_create_event(struct tsocket *sock, evloop_t *el) { - struct connAcceptor *ca = malloc(sizeof(*ca)); + conn_acceptor_t *ca = malloc(sizeof(*ca)); ca->sock = sock; ca->el = el; - struct event *ev = eventNew(ca, sock->fd, connAcceptorAccept, connectionDel); + struct event *ev = event_create(ca, sock->fd, + (evloop_process_func_t) conn_acceptor_accept, + (evloop_destroy_func_t) conn_acceptor_destroy); return ev; } diff --git a/acceptor.h b/acceptor.h index 8432ef6..f4c9284 100644 --- a/acceptor.h +++ b/acceptor.h @@ -1,14 +1,9 @@ #include "tsocket.h" -#include "eventloop.h" +#include "evloop.h" #ifndef __ACCEPTOR_H #define __ACCEPTOR_H -struct connAcceptor { - struct tsocket *sock; - struct eventLoop *el; -}; - -struct event *connAcceptorNewEvent(struct tsocket *sock, struct eventLoop *el); +struct event *conn_acceptor_create_event(struct tsocket *sock, evloop_t *el); #endif diff --git a/connection.c b/connection.c index d12ab86..c164ec8 100644 --- a/connection.c +++ b/connection.c @@ -8,7 +8,11 @@ #define READ_BUFFER_SIZE 1024 -struct connection *connectionNew(struct tsocket *sock) +struct connection { + struct tsocket *sock; +}; + +struct connection *connection_create(struct tsocket *sock) { struct connection *conn = malloc(sizeof(*conn)); conn->sock = sock; @@ -16,13 +20,13 @@ struct connection *connectionNew(struct tsocket *sock) return conn; } -void connectionDel(struct connection *conn) +void connection_destroy(struct connection *conn) { - tsocketDelete(conn->sock); + tsocket_destroy(conn->sock); free(conn); } -int echo(struct connection *conn) +static void echo(struct connection *conn) { char buf[READ_BUFFER_SIZE]; ssize_t n_read_bytes; @@ -36,18 +40,19 @@ int echo(struct connection *conn) write(sock->fd, buf, sizeof(buf)); } else if (n_read_bytes == 0) { printf("conn %d disconnected\n", sock->fd); - return ERROR; + return; } else if (n_read_bytes == -1) { if (errno == EAGAIN || errno == EWOULDBLOCK) break; } } - - return OK; } -struct event *connectionNewEvent(struct connection *conn) +event_t *connection_create_event(struct tsocket *sock) { - return eventNew(conn, conn->sock->fd, echo, connectionDel); + connection_t *conn = connection_create(sock); + return event_create(conn, conn->sock->fd, + (evloop_process_func_t) echo, + (evloop_destroy_func_t) connection_destroy); } diff --git a/connection.h b/connection.h index c7b24fc..4e14487 100644 --- a/connection.h +++ b/connection.h @@ -1,15 +1,15 @@ #include "tsocket.h" -#include "eventloop.h" +#include "evloop.h" #ifndef __CONNECTION_H #define __CONNECTION_H -struct connection { - struct tsocket *sock; -}; +struct connection; +typedef struct connection connection_t; -struct connection *connectionNew(struct tsocket *sock); -void connectionDel(struct connection *conn); -struct event *connectionNewEvent(struct connection *conn); +connection_t *connection_create(struct tsocket *sock); +void connection_destroy(struct connection *conn); + +event_t *connection_create_event(struct tsocket *sock); #endif diff --git a/eventloop.c b/eventloop.c deleted file mode 100644 index ae4377c..0000000 --- a/eventloop.c +++ /dev/null @@ -1,84 +0,0 @@ -#include -#include - -#include "eventloop.h" -#include "util.h" -#include "constant.h" - -struct eventLoop *eventLoopNew() -{ - int epollfd = epoll_create1(0); - if (epollfd == -1) - return NULL; - - struct eventLoop *eventLoop = malloc(sizeof(*eventLoop)); - eventLoop->epollfd = epollfd; - eventLoop->size = EVENT_LOOP_MAX_EVENTS; - - return eventLoop; -} - -int eventLoopWait(struct eventLoop *el, int timeout) -{ - return epoll_wait(el->epollfd, el->events, el->size, timeout); -} - -struct tsocket *eventLoopGetSocket(struct eventLoop *el, int index) -{ - return (struct tsocket *)el->events[index].data.ptr; -} - -int eventLoopAdd(struct eventLoop *el, struct event *ev, int flag) -{ - struct epoll_event epev; - epev.events = flag; - epev.data.ptr = ev; - - return epoll_ctl(el->epollfd, EPOLL_CTL_ADD, ev->fd, &epev); -} - -struct event *eventLoopGet(struct eventLoop *el, int index) -{ - return (struct event *)el->events[index].data.ptr; -} - -void eventLoopLoop(struct eventLoop *el) -{ - int nevents; - struct event *ev; - - for (;;) { - nevents = eventLoopWait(el, -1); - if (nevents == -1) - panic("eventloop wait"); - - for (int i = 0; i < nevents; i++) { - ev = eventLoopGet(el, i); - if (ev->handle(ev->data) == -1) - eventLoopDel(el, ev); - } - } -} - -int eventLoopDel(struct eventLoop *el, struct event *ev) -{ - if (epoll_ctl(el->epollfd, EPOLL_CTL_DEL, ev->fd, NULL) == -1) - return ERROR; - - if (ev->delete) - ev->delete(ev->data); - free(ev); - - return OK; -} - -struct event *eventNew(void *data, int fd, void (*handle), void (*delete)) -{ - struct event *ev = malloc(sizeof(*ev)); - ev->data = data; - ev->handle = handle; - ev->fd = fd; - ev->delete = delete; - - return ev; -} diff --git a/eventloop.h b/eventloop.h deleted file mode 100644 index 9aafb68..0000000 --- a/eventloop.h +++ /dev/null @@ -1,31 +0,0 @@ -#include - -#include "tsocket.h" - -#ifndef __EVENT_LOOP_H -#define __EVENT_LOOP_H - -#define EVENT_LOOP_MAX_EVENTS 1024 - -struct eventLoop { - int epollfd; - struct epoll_event events[EVENT_LOOP_MAX_EVENTS]; - int size; -}; - -struct event { - int fd; - void *data; - int (*handle)(void *data); - void (*delete)(void *data); -}; - -struct eventLoop *eventLoopNew(); -int eventLoopWait(struct eventLoop *el, int timeout); -int eventLoopAdd(struct eventLoop *el, struct event *ev, int flag); -int eventLoopDel(struct eventLoop *el, struct event *ev); -struct event *eventLoopGet(struct eventLoop *el, int index); -void eventLoopLoop(struct eventLoop *el); -struct event *eventNew(void *data, int fd, void (*handle), void (*delete)); - -#endif diff --git a/evloop.c b/evloop.c new file mode 100644 index 0000000..5bed1e9 --- /dev/null +++ b/evloop.c @@ -0,0 +1,89 @@ +#include +#include + +#include "evloop.h" +#include "util.h" +#include "constant.h" +#include "tpool.h" + +struct evloop { + int epollfd; + struct epoll_event events[EVENT_LOOP_MAX_EVENTS]; + int size; +}; + +evloop_t *evloop_create() +{ + int epollfd = epoll_create1(0); + if (epollfd == -1) + return NULL; + + evloop_t *eventLoop = malloc(sizeof(*eventLoop)); + eventLoop->epollfd = epollfd; + eventLoop->size = EVENT_LOOP_MAX_EVENTS; + + return eventLoop; +} + +int evloop_wait(evloop_t *el, int timeout) +{ + return epoll_wait(el->epollfd, el->events, el->size, timeout); +} + +int evloop_add(evloop_t *el, event_t *ev, int flag) +{ + struct epoll_event epev; + epev.events = flag; + epev.data.ptr = ev; + + return epoll_ctl(el->epollfd, EPOLL_CTL_ADD, ev->fd, &epev); +} + +event_t *evloop_get(evloop_t *el, int index) +{ + return (event_t *)el->events[index].data.ptr; +} + +void evloop_loop(evloop_t *el) +{ + int nevents; + event_t *ev; + + for (;;) { + nevents = evloop_wait(el, -1); + if (nevents == -1) + panic("eventloop wait"); + + for (int i = 0; i < nevents; i++) { + ev = evloop_get(el, i); + // TODO: detect if there is any issue of the handle function + // and delete the event when issues happen + tpool_add_work(tpool, ev->process, ev->data); + } + } +} + +int evloop_remove(evloop_t *el, event_t *ev) +{ + if (epoll_ctl(el->epollfd, EPOLL_CTL_DEL, ev->fd, NULL) == -1) + return ERROR; + + if (ev->destroy) + ev->destroy(ev->data); + free(ev); + + return OK; +} + +event_t *event_create(void *data, int fd, + evloop_process_func_t process, + evloop_destroy_func_t destroy) +{ + event_t *ev = malloc(sizeof(*ev)); + ev->data = data; + ev->process = process; + ev->fd = fd; + ev->destroy = destroy; + + return ev; +} diff --git a/evloop.h b/evloop.h new file mode 100644 index 0000000..f0d4f72 --- /dev/null +++ b/evloop.h @@ -0,0 +1,32 @@ +#include "tsocket.h" + +#ifndef __EVLOOP_H +#define __EVLOOP_H + +#define EVENT_LOOP_MAX_EVENTS 1024 + +struct evloop; +typedef struct evloop evloop_t; + +typedef void (*evloop_process_func_t)(void *data); +typedef void (*evloop_destroy_func_t)(void *data); + +struct event { + int fd; + void *data; + evloop_destroy_func_t destroy; + evloop_process_func_t process; +}; +typedef struct event event_t; + +evloop_t *evloop_create(); +int evloop_wait(evloop_t *el, int timeout); +int evloop_add(evloop_t *el, event_t *ev, int flag); +int evloop_remove(evloop_t *el, event_t *ev); +event_t *evloop_get(evloop_t *el, int index); +void evloop_loop(evloop_t *el); +event_t *event_create(void *data, int fd, + evloop_process_func_t process, + evloop_destroy_func_t destroy); + +#endif diff --git a/server.c b/server.c index f4ad78d..8525689 100644 --- a/server.c +++ b/server.c @@ -10,27 +10,31 @@ #include #include -#include "eventloop.h" +#include "evloop.h" #include "tsocket.h" #include "util.h" -#include "connection.h" #include "acceptor.h" +#include "tpool.h" int main() { - struct eventLoop *el = eventLoopNew(); + evloop_t *el = evloop_create(); if (el == NULL) panic("eventloop creation"); - struct tsocket *sock = tsocketNew(); + struct tsocket *sock = tsocket_create(); if (sock == NULL - || tsocketBind(sock, "127.0.0.1", 8888) == -1 - || tsocketListen(sock) == -1) + || tsocket_bind(sock, "127.0.0.1", 8888) == -1 + || tsocket_listen(sock) == -1) panic("socket creation"); - struct event *acceptEvent = connAcceptorNewEvent(sock, el); - if (eventLoopAdd(el, acceptEvent, EPOLLIN) == -1) + tpool = tpool_create(0); + if (!tpool) + panic("tpool_create"); + + event_t *acceptEvent = conn_acceptor_create_event(sock, el); + if (evloop_add(el, acceptEvent, EPOLLIN) == -1) panic("eventloop add fd"); - eventLoopLoop(el); + evloop_loop(el); } diff --git a/tpool.c b/tpool.c new file mode 100644 index 0000000..1aa5986 --- /dev/null +++ b/tpool.c @@ -0,0 +1,186 @@ +#include +#include +#include "tpool.h" + +struct tpool_work { + thread_func_t func; + void *arg; + struct tpool_work *next; +}; +typedef struct tpool_work tpool_work_t; + +tpool_t *tpool; + +static tpool_work_t *tpool_work_create(thread_func_t func, void *arg) +{ + tpool_work_t *work; + + if (func == NULL) + return NULL; + + work = (tpool_work_t *)malloc(sizeof(*work)); + work->func = func; + work->arg = arg; + work->next = NULL; + return work; +} + +static void tpool_work_destroy(tpool_work_t *work) +{ + if (work == NULL) + return; + + free(work); +} + +struct tpool { + tpool_work_t *work_first; + tpool_work_t *work_last; + pthread_mutex_t work_mutex; + pthread_cond_t work_cond; + pthread_cond_t working_cond; + size_t working_cnt; + size_t thread_cnt; + bool stop; +}; + +static tpool_work_t *tpool_pop(tpool_t *tp) +{ + if (tp == NULL) + return NULL; + + tpool_work_t *work = tp->work_first; + if (work == NULL) + return NULL; + + tp->work_first = work->next; + if (work == tp->work_last) + tp->work_last = NULL; + + return work; +} + +static void *tpool_worker(void *arg) +{ + tpool_t *tp = (tpool_t *)arg; + tpool_work_t *work; + + for (;;) { + pthread_mutex_lock(&tp->work_mutex); + + while (tp->work_first == NULL && !tp->stop) + pthread_cond_wait(&tp->work_cond, &tp->work_mutex); + + if (tp->stop) + break; + + work = tpool_pop(tp); + tp->working_cnt++; + pthread_mutex_unlock(&tp->work_mutex); + + if (work != NULL) { + work->func(work->arg); + tpool_work_destroy(work); + } + + pthread_mutex_lock(&tp->work_mutex); + tp->working_cnt--; + if (!tp->stop && tp->working_cnt == 0 && tp->work_first == NULL) + pthread_cond_signal(&tp->working_cond); + pthread_mutex_unlock(&tp->work_mutex); + } + + tp->thread_cnt--; + pthread_cond_signal(&tp->working_cond); + pthread_mutex_unlock(&tp->work_mutex); + + return NULL; +} + +tpool_t *tpool_create(size_t num) +{ + tpool_t *tp; + pthread_t thread; + + if (num == 0) + num = 12; + + tp = (tpool_t *)malloc(sizeof(*tp)); + tp->thread_cnt = num; + tp->work_first = tp->work_last = NULL; + + pthread_mutex_init(&tp->work_mutex, NULL); + pthread_cond_init(&tp->work_cond, NULL); + pthread_cond_init(&tp->working_cond, NULL); + + for (int i = 0; i < num; i++) { + pthread_create(&thread, NULL, tpool_worker, tp); + pthread_detach(thread); + } + + return tp; +} + +void tpool_destroy(tpool_t *tp) +{ + tpool_work_t *work, *next_work; + + if (tp == NULL) + return; + + pthread_mutex_lock(&tp->work_mutex); + work = tp->work_first; + while (work != NULL) { + next_work = work->next; + tpool_work_destroy(work); + work = next_work; + } + tp->stop = true; + + pthread_cond_broadcast(&tp->work_cond); + pthread_mutex_unlock(&tp->work_mutex); + + tpool_wait(tp); + + pthread_mutex_destroy(&tp->work_mutex); + pthread_cond_destroy(&tp->work_cond); + pthread_cond_destroy(&tp->working_cond); + + free(tp); +} + +bool tpool_add_work(tpool_t *tp, thread_func_t func, void *arg) +{ + tpool_work_t *work; + + if (tp == NULL) + return false; + + work = tpool_work_create(func, arg); + if (work == NULL) + return false; + + pthread_mutex_lock(&tp->work_mutex); + if (!tp->work_first) + tp->work_first = tp->work_last = work; + else + tp->work_last = tp->work_last->next = work; + + pthread_cond_broadcast(&tp->work_cond); + pthread_mutex_unlock(&tp->work_mutex); + + return true; +} + +void tpool_wait(tpool_t *tp) +{ + if (tp == NULL) + return; + + pthread_mutex_lock(&tp->work_mutex); + + while ((!tp->stop && tp->working_cnt != 0) || (tp->stop && tp->thread_cnt != 0)) + pthread_cond_wait(&tp->working_cond, &tp->work_mutex); + + pthread_mutex_unlock(&tp->work_mutex); +} diff --git a/tpool.h b/tpool.h new file mode 100644 index 0000000..e1079ef --- /dev/null +++ b/tpool.h @@ -0,0 +1,20 @@ +#include +#include + +#ifndef __TPOOL_H +#define __TPOOL_H + +struct tpool; +typedef struct tpool tpool_t; + +extern tpool_t *tpool; + +typedef void (*thread_func_t)(void *arg); + +tpool_t *tpool_create(size_t num); +void tpool_destroy(tpool_t *tp); + +bool tpool_add_work(tpool_t *tp, thread_func_t func, void *arg); +void tpool_wait(tpool_t *tp); + +#endif diff --git a/tsocket.c b/tsocket.c index 76e6064..8d8038a 100644 --- a/tsocket.c +++ b/tsocket.c @@ -5,7 +5,7 @@ #include "tsocket.h" -struct tsocket *tsocketNew() +struct tsocket *tsocket_create() { int fd = socket(AF_INET, SOCK_STREAM, 0); if (fd == -1) @@ -17,7 +17,7 @@ struct tsocket *tsocketNew() return sock; } -int tsocketBind(struct tsocket *sock, const char *addr, int hostport) +int tsocket_bind(struct tsocket *sock, const char *addr, int hostport) { struct sockaddr_in sock_addr; sock_addr.sin_family = AF_INET; @@ -30,12 +30,12 @@ int tsocketBind(struct tsocket *sock, const char *addr, int hostport) return bind(sock->fd, (struct sockaddr *)&sock_addr, sizeof(sock_addr)); } -int tsocketListen(struct tsocket *sock) +int tsocket_listen(struct tsocket *sock) { return listen(sock->fd, SOMAXCONN); } -struct tsocket *tsocketAccept(struct tsocket *sock) +struct tsocket *tsocket_accept(struct tsocket *sock) { struct sockaddr_in addr; socklen_t addr_len = sizeof(addr); @@ -52,7 +52,7 @@ struct tsocket *tsocketAccept(struct tsocket *sock) return conn_sock; } -void tsocketDelete(struct tsocket *sock) +void tsocket_destroy(struct tsocket *sock) { close(sock->fd); free(sock); diff --git a/tsocket.h b/tsocket.h index 5a8826c..5028b04 100644 --- a/tsocket.h +++ b/tsocket.h @@ -8,11 +8,12 @@ struct tsocket { const char *addr; int port; }; +typedef struct tsocket tsocket_t; -struct tsocket *tsocketNew(); -int tsocketBind(struct tsocket *sock, const char *addr, int hostport); -int tsocketListen(struct tsocket *sock); -struct tsocket *tsocketAccept(struct tsocket *sock); -void tsocketDelete(struct tsocket *sock); +struct tsocket *tsocket_create(); +int tsocket_bind(struct tsocket *sock, const char *addr, int hostport); +int tsocket_listen(struct tsocket *sock); +struct tsocket *tsocket_accept(struct tsocket *sock); +void tsocket_destroy(struct tsocket *sock); #endif -- cgit v1.2.3