From 7369505397cdfddf0883e2c24e1652df8bd488fe Mon Sep 17 00:00:00 2001 From: Guangxiong Lin Date: Fri, 16 Dec 2022 13:53:35 +0800 Subject: Refactor file structure --- src/Makefile | 22 +++++++ src/acceptor.c | 58 +++++++++++++++++ src/acceptor.h | 9 +++ src/client.c | 48 ++++++++++++++ src/connection.c | 58 +++++++++++++++++ src/connection.h | 15 +++++ src/constant.h | 7 +++ src/evloop.c | 89 ++++++++++++++++++++++++++ src/evloop.h | 32 ++++++++++ src/server.c | 40 ++++++++++++ src/tpool.c | 186 +++++++++++++++++++++++++++++++++++++++++++++++++++++++ src/tpool.h | 20 ++++++ src/tsocket.c | 59 ++++++++++++++++++ src/tsocket.h | 19 ++++++ src/util.c | 23 +++++++ src/util.h | 9 +++ 16 files changed, 694 insertions(+) create mode 100644 src/Makefile create mode 100644 src/acceptor.c create mode 100644 src/acceptor.h create mode 100644 src/client.c create mode 100644 src/connection.c create mode 100644 src/connection.h create mode 100644 src/constant.h create mode 100644 src/evloop.c create mode 100644 src/evloop.h create mode 100644 src/server.c create mode 100644 src/tpool.c create mode 100644 src/tpool.h create mode 100644 src/tsocket.c create mode 100644 src/tsocket.h create mode 100644 src/util.c create mode 100644 src/util.h (limited to 'src') diff --git a/src/Makefile b/src/Makefile new file mode 100644 index 0000000..bceb62e --- /dev/null +++ b/src/Makefile @@ -0,0 +1,22 @@ +CC ?= gcc +CFLAGS ?= -lpthread + +.PHONY: all +all: client server + +.PHONY: debug +debug: CFLAGS += -g -DDEBUG=1 +debug: server client + +.PHONY: clean +clean: + rm -rf client server *.o + +%.o: %.c + $(CC) -c $(CFLAGS) $(CPPFLAGS) $< -o $@ + +client: client.o util.o + $(CC) -o $@ $^ + +server: server.o util.o evloop.o tsocket.o acceptor.o connection.o tpool.o + $(CC) -o $@ $^ diff --git a/src/acceptor.c b/src/acceptor.c new file mode 100644 index 0000000..1f28fcc --- /dev/null +++ b/src/acceptor.c @@ -0,0 +1,58 @@ +#include +#include +#include +#include + +#include "acceptor.h" +#include "util.h" +#include "connection.h" +#include "evloop.h" + +struct conn_acceptor { + struct tsocket *sock; + evloop_t *el; +}; +typedef struct conn_acceptor conn_acceptor_t; + +static void conn_acceptor_accept(conn_acceptor_t *ca) +{ + struct tsocket *conn_sock = tsocket_accept(ca->sock); + if (conn_sock == NULL) { + perror("socket accept"); + return; + } + + if (setblocking(conn_sock->fd, false) == -1) { + perror("setblocking"); + tsocket_destroy(conn_sock); + return; + } + + event_t *conn_ev = connection_create_event(conn_sock); + + if (evloop_add(ca->el, conn_ev, EPOLLIN | EPOLLET) == -1) { + perror("eventloop add fd: conn_sock"); + return; + } + + printf("New client fd %d, ip: %s, port: %d\n", + conn_sock->fd, conn_sock->addr, conn_sock->port); +} + +static void conn_acceptor_destroy(conn_acceptor_t *ca) +{ + tsocket_destroy(ca->sock); + free(ca); +} + +event_t *conn_acceptor_create_event(struct tsocket *sock, evloop_t *el) +{ + conn_acceptor_t *ca = malloc(sizeof(*ca)); + ca->sock = sock; + ca->el = el; + + struct event *ev = event_create(ca, sock->fd, + (evloop_process_func_t) conn_acceptor_accept, + (evloop_destroy_func_t) conn_acceptor_destroy); + return ev; +} diff --git a/src/acceptor.h b/src/acceptor.h new file mode 100644 index 0000000..f4c9284 --- /dev/null +++ b/src/acceptor.h @@ -0,0 +1,9 @@ +#include "tsocket.h" +#include "evloop.h" + +#ifndef __ACCEPTOR_H +#define __ACCEPTOR_H + +struct event *conn_acceptor_create_event(struct tsocket *sock, evloop_t *el); + +#endif diff --git a/src/client.c b/src/client.c new file mode 100644 index 0000000..0b9cbfe --- /dev/null +++ b/src/client.c @@ -0,0 +1,48 @@ +#include +#include +#include +#include +#include +#include +#include + +#include "util.h" + +int +main() +{ + int sockfd = socket(AF_INET, SOCK_STREAM, 0); + if (sockfd == -1) + panic("socket creation error"); + + struct sockaddr_in serv_addr; + bzero(&serv_addr, sizeof(serv_addr)); + serv_addr.sin_family = AF_INET; + serv_addr.sin_addr.s_addr = inet_addr("127.0.0.1"); + serv_addr.sin_port = htons(8888); + + if (connect(sockfd, (struct sockaddr *)&serv_addr, sizeof(serv_addr)) == -1) + panic("socket connect error"); + + for (;;) { + char buf[1024]; + bzero(&buf, sizeof(buf)); + scanf("%s", buf); + ssize_t n_write_bytes = write(sockfd, buf, sizeof(buf)); + if (n_write_bytes == -1) { + printf("socket already disconnected, cannot write any more!\n"); + break; + } + + ssize_t n_read_bytes = read(sockfd, buf, sizeof(buf)); + if (n_read_bytes > 0) { + printf("message from server: %s\n", buf); + } else if (n_read_bytes == 0) { + printf("server socket disconnected!\n"); + break; + } else if (n_read_bytes == -1) { + close(sockfd); + panic("socket read error"); + } + } +} diff --git a/src/connection.c b/src/connection.c new file mode 100644 index 0000000..c164ec8 --- /dev/null +++ b/src/connection.c @@ -0,0 +1,58 @@ +#include +#include +#include +#include + +#include "connection.h" +#include "constant.h" + +#define READ_BUFFER_SIZE 1024 + +struct connection { + struct tsocket *sock; +}; + +struct connection *connection_create(struct tsocket *sock) +{ + struct connection *conn = malloc(sizeof(*conn)); + conn->sock = sock; + + return conn; +} + +void connection_destroy(struct connection *conn) +{ + tsocket_destroy(conn->sock); + free(conn); +} + +static void echo(struct connection *conn) +{ + char buf[READ_BUFFER_SIZE]; + ssize_t n_read_bytes; + + struct tsocket *sock = (struct tsocket *)conn->sock; + + for (;;) { + n_read_bytes = read(sock->fd, buf, sizeof(buf)); + if (n_read_bytes > 0) { + printf("message from conn %d: %s\n", sock->fd, buf); + write(sock->fd, buf, sizeof(buf)); + } else if (n_read_bytes == 0) { + printf("conn %d disconnected\n", sock->fd); + return; + } else if (n_read_bytes == -1) { + if (errno == EAGAIN || errno == EWOULDBLOCK) + break; + } + } +} + +event_t *connection_create_event(struct tsocket *sock) +{ + connection_t *conn = connection_create(sock); + return event_create(conn, conn->sock->fd, + (evloop_process_func_t) echo, + (evloop_destroy_func_t) connection_destroy); +} + diff --git a/src/connection.h b/src/connection.h new file mode 100644 index 0000000..4e14487 --- /dev/null +++ b/src/connection.h @@ -0,0 +1,15 @@ +#include "tsocket.h" +#include "evloop.h" + +#ifndef __CONNECTION_H +#define __CONNECTION_H + +struct connection; +typedef struct connection connection_t; + +connection_t *connection_create(struct tsocket *sock); +void connection_destroy(struct connection *conn); + +event_t *connection_create_event(struct tsocket *sock); + +#endif diff --git a/src/constant.h b/src/constant.h new file mode 100644 index 0000000..1bb6bf4 --- /dev/null +++ b/src/constant.h @@ -0,0 +1,7 @@ +#ifndef __CONSTANT_H +#define __CONSTANT_H + +#define ERROR -1 +#define OK 0 + +#endif diff --git a/src/evloop.c b/src/evloop.c new file mode 100644 index 0000000..5bed1e9 --- /dev/null +++ b/src/evloop.c @@ -0,0 +1,89 @@ +#include +#include + +#include "evloop.h" +#include "util.h" +#include "constant.h" +#include "tpool.h" + +struct evloop { + int epollfd; + struct epoll_event events[EVENT_LOOP_MAX_EVENTS]; + int size; +}; + +evloop_t *evloop_create() +{ + int epollfd = epoll_create1(0); + if (epollfd == -1) + return NULL; + + evloop_t *eventLoop = malloc(sizeof(*eventLoop)); + eventLoop->epollfd = epollfd; + eventLoop->size = EVENT_LOOP_MAX_EVENTS; + + return eventLoop; +} + +int evloop_wait(evloop_t *el, int timeout) +{ + return epoll_wait(el->epollfd, el->events, el->size, timeout); +} + +int evloop_add(evloop_t *el, event_t *ev, int flag) +{ + struct epoll_event epev; + epev.events = flag; + epev.data.ptr = ev; + + return epoll_ctl(el->epollfd, EPOLL_CTL_ADD, ev->fd, &epev); +} + +event_t *evloop_get(evloop_t *el, int index) +{ + return (event_t *)el->events[index].data.ptr; +} + +void evloop_loop(evloop_t *el) +{ + int nevents; + event_t *ev; + + for (;;) { + nevents = evloop_wait(el, -1); + if (nevents == -1) + panic("eventloop wait"); + + for (int i = 0; i < nevents; i++) { + ev = evloop_get(el, i); + // TODO: detect if there is any issue of the handle function + // and delete the event when issues happen + tpool_add_work(tpool, ev->process, ev->data); + } + } +} + +int evloop_remove(evloop_t *el, event_t *ev) +{ + if (epoll_ctl(el->epollfd, EPOLL_CTL_DEL, ev->fd, NULL) == -1) + return ERROR; + + if (ev->destroy) + ev->destroy(ev->data); + free(ev); + + return OK; +} + +event_t *event_create(void *data, int fd, + evloop_process_func_t process, + evloop_destroy_func_t destroy) +{ + event_t *ev = malloc(sizeof(*ev)); + ev->data = data; + ev->process = process; + ev->fd = fd; + ev->destroy = destroy; + + return ev; +} diff --git a/src/evloop.h b/src/evloop.h new file mode 100644 index 0000000..f0d4f72 --- /dev/null +++ b/src/evloop.h @@ -0,0 +1,32 @@ +#include "tsocket.h" + +#ifndef __EVLOOP_H +#define __EVLOOP_H + +#define EVENT_LOOP_MAX_EVENTS 1024 + +struct evloop; +typedef struct evloop evloop_t; + +typedef void (*evloop_process_func_t)(void *data); +typedef void (*evloop_destroy_func_t)(void *data); + +struct event { + int fd; + void *data; + evloop_destroy_func_t destroy; + evloop_process_func_t process; +}; +typedef struct event event_t; + +evloop_t *evloop_create(); +int evloop_wait(evloop_t *el, int timeout); +int evloop_add(evloop_t *el, event_t *ev, int flag); +int evloop_remove(evloop_t *el, event_t *ev); +event_t *evloop_get(evloop_t *el, int index); +void evloop_loop(evloop_t *el); +event_t *event_create(void *data, int fd, + evloop_process_func_t process, + evloop_destroy_func_t destroy); + +#endif diff --git a/src/server.c b/src/server.c new file mode 100644 index 0000000..8525689 --- /dev/null +++ b/src/server.c @@ -0,0 +1,40 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "evloop.h" +#include "tsocket.h" +#include "util.h" +#include "acceptor.h" +#include "tpool.h" + +int main() +{ + evloop_t *el = evloop_create(); + if (el == NULL) + panic("eventloop creation"); + + struct tsocket *sock = tsocket_create(); + if (sock == NULL + || tsocket_bind(sock, "127.0.0.1", 8888) == -1 + || tsocket_listen(sock) == -1) + panic("socket creation"); + + tpool = tpool_create(0); + if (!tpool) + panic("tpool_create"); + + event_t *acceptEvent = conn_acceptor_create_event(sock, el); + if (evloop_add(el, acceptEvent, EPOLLIN) == -1) + panic("eventloop add fd"); + + evloop_loop(el); +} diff --git a/src/tpool.c b/src/tpool.c new file mode 100644 index 0000000..1aa5986 --- /dev/null +++ b/src/tpool.c @@ -0,0 +1,186 @@ +#include +#include +#include "tpool.h" + +struct tpool_work { + thread_func_t func; + void *arg; + struct tpool_work *next; +}; +typedef struct tpool_work tpool_work_t; + +tpool_t *tpool; + +static tpool_work_t *tpool_work_create(thread_func_t func, void *arg) +{ + tpool_work_t *work; + + if (func == NULL) + return NULL; + + work = (tpool_work_t *)malloc(sizeof(*work)); + work->func = func; + work->arg = arg; + work->next = NULL; + return work; +} + +static void tpool_work_destroy(tpool_work_t *work) +{ + if (work == NULL) + return; + + free(work); +} + +struct tpool { + tpool_work_t *work_first; + tpool_work_t *work_last; + pthread_mutex_t work_mutex; + pthread_cond_t work_cond; + pthread_cond_t working_cond; + size_t working_cnt; + size_t thread_cnt; + bool stop; +}; + +static tpool_work_t *tpool_pop(tpool_t *tp) +{ + if (tp == NULL) + return NULL; + + tpool_work_t *work = tp->work_first; + if (work == NULL) + return NULL; + + tp->work_first = work->next; + if (work == tp->work_last) + tp->work_last = NULL; + + return work; +} + +static void *tpool_worker(void *arg) +{ + tpool_t *tp = (tpool_t *)arg; + tpool_work_t *work; + + for (;;) { + pthread_mutex_lock(&tp->work_mutex); + + while (tp->work_first == NULL && !tp->stop) + pthread_cond_wait(&tp->work_cond, &tp->work_mutex); + + if (tp->stop) + break; + + work = tpool_pop(tp); + tp->working_cnt++; + pthread_mutex_unlock(&tp->work_mutex); + + if (work != NULL) { + work->func(work->arg); + tpool_work_destroy(work); + } + + pthread_mutex_lock(&tp->work_mutex); + tp->working_cnt--; + if (!tp->stop && tp->working_cnt == 0 && tp->work_first == NULL) + pthread_cond_signal(&tp->working_cond); + pthread_mutex_unlock(&tp->work_mutex); + } + + tp->thread_cnt--; + pthread_cond_signal(&tp->working_cond); + pthread_mutex_unlock(&tp->work_mutex); + + return NULL; +} + +tpool_t *tpool_create(size_t num) +{ + tpool_t *tp; + pthread_t thread; + + if (num == 0) + num = 12; + + tp = (tpool_t *)malloc(sizeof(*tp)); + tp->thread_cnt = num; + tp->work_first = tp->work_last = NULL; + + pthread_mutex_init(&tp->work_mutex, NULL); + pthread_cond_init(&tp->work_cond, NULL); + pthread_cond_init(&tp->working_cond, NULL); + + for (int i = 0; i < num; i++) { + pthread_create(&thread, NULL, tpool_worker, tp); + pthread_detach(thread); + } + + return tp; +} + +void tpool_destroy(tpool_t *tp) +{ + tpool_work_t *work, *next_work; + + if (tp == NULL) + return; + + pthread_mutex_lock(&tp->work_mutex); + work = tp->work_first; + while (work != NULL) { + next_work = work->next; + tpool_work_destroy(work); + work = next_work; + } + tp->stop = true; + + pthread_cond_broadcast(&tp->work_cond); + pthread_mutex_unlock(&tp->work_mutex); + + tpool_wait(tp); + + pthread_mutex_destroy(&tp->work_mutex); + pthread_cond_destroy(&tp->work_cond); + pthread_cond_destroy(&tp->working_cond); + + free(tp); +} + +bool tpool_add_work(tpool_t *tp, thread_func_t func, void *arg) +{ + tpool_work_t *work; + + if (tp == NULL) + return false; + + work = tpool_work_create(func, arg); + if (work == NULL) + return false; + + pthread_mutex_lock(&tp->work_mutex); + if (!tp->work_first) + tp->work_first = tp->work_last = work; + else + tp->work_last = tp->work_last->next = work; + + pthread_cond_broadcast(&tp->work_cond); + pthread_mutex_unlock(&tp->work_mutex); + + return true; +} + +void tpool_wait(tpool_t *tp) +{ + if (tp == NULL) + return; + + pthread_mutex_lock(&tp->work_mutex); + + while ((!tp->stop && tp->working_cnt != 0) || (tp->stop && tp->thread_cnt != 0)) + pthread_cond_wait(&tp->working_cond, &tp->work_mutex); + + pthread_mutex_unlock(&tp->work_mutex); +} diff --git a/src/tpool.h b/src/tpool.h new file mode 100644 index 0000000..e1079ef --- /dev/null +++ b/src/tpool.h @@ -0,0 +1,20 @@ +#include +#include + +#ifndef __TPOOL_H +#define __TPOOL_H + +struct tpool; +typedef struct tpool tpool_t; + +extern tpool_t *tpool; + +typedef void (*thread_func_t)(void *arg); + +tpool_t *tpool_create(size_t num); +void tpool_destroy(tpool_t *tp); + +bool tpool_add_work(tpool_t *tp, thread_func_t func, void *arg); +void tpool_wait(tpool_t *tp); + +#endif diff --git a/src/tsocket.c b/src/tsocket.c new file mode 100644 index 0000000..8d8038a --- /dev/null +++ b/src/tsocket.c @@ -0,0 +1,59 @@ +#include +#include +#include +#include + +#include "tsocket.h" + +struct tsocket *tsocket_create() +{ + int fd = socket(AF_INET, SOCK_STREAM, 0); + if (fd == -1) + return NULL; + + struct tsocket *sock = malloc(sizeof(*sock)); + sock->fd = fd; + + return sock; +} + +int tsocket_bind(struct tsocket *sock, const char *addr, int hostport) +{ + struct sockaddr_in sock_addr; + sock_addr.sin_family = AF_INET; + sock_addr.sin_addr.s_addr = inet_addr(addr); + sock_addr.sin_port = htons(hostport); + + sock->addr = addr; + sock->port = hostport; + + return bind(sock->fd, (struct sockaddr *)&sock_addr, sizeof(sock_addr)); +} + +int tsocket_listen(struct tsocket *sock) +{ + return listen(sock->fd, SOMAXCONN); +} + +struct tsocket *tsocket_accept(struct tsocket *sock) +{ + struct sockaddr_in addr; + socklen_t addr_len = sizeof(addr); + + int fd = accept(sock->fd, (struct sockaddr *)&addr, &addr_len); + if (fd == -1) + return NULL; + + struct tsocket *conn_sock = malloc(sizeof(*conn_sock)); + conn_sock->fd = fd; + conn_sock->addr = inet_ntoa(addr.sin_addr); + conn_sock->port = ntohs(addr.sin_port); + + return conn_sock; +} + +void tsocket_destroy(struct tsocket *sock) +{ + close(sock->fd); + free(sock); +} diff --git a/src/tsocket.h b/src/tsocket.h new file mode 100644 index 0000000..5028b04 --- /dev/null +++ b/src/tsocket.h @@ -0,0 +1,19 @@ +#include + +#ifndef __TSOCKET_H +#define __TSOCKET_H + +struct tsocket { + int fd; + const char *addr; + int port; +}; +typedef struct tsocket tsocket_t; + +struct tsocket *tsocket_create(); +int tsocket_bind(struct tsocket *sock, const char *addr, int hostport); +int tsocket_listen(struct tsocket *sock); +struct tsocket *tsocket_accept(struct tsocket *sock); +void tsocket_destroy(struct tsocket *sock); + +#endif diff --git a/src/util.c b/src/util.c new file mode 100644 index 0000000..b4053fd --- /dev/null +++ b/src/util.c @@ -0,0 +1,23 @@ +#include +#include +#include +#include +#include + +#include "util.h" +#include "constant.h" + +void panic(const char *msg) +{ + perror(msg); + exit(EXIT_FAILURE); +} + +int setblocking(int fd, bool blocking) +{ + u_long mode = blocking ? 0 : 1; + if (ioctl(fd, FIONBIO, &mode) == -1) + return ERROR; + + return OK; +} diff --git a/src/util.h b/src/util.h new file mode 100644 index 0000000..850a245 --- /dev/null +++ b/src/util.h @@ -0,0 +1,9 @@ +#include + +#ifndef __UTIL_H +#define __UTIL_H + +void panic(const char *); +int setblocking(int fd, bool blocking); + +#endif -- cgit v1.2.3