From d41d2341dcd206782bf468732a84cc95772936fa Mon Sep 17 00:00:00 2001 From: FUJITA Tomonori Date: Wed, 16 Dec 2009 09:53:41 +0900 Subject: [PATCH] add non signalfd support Signed-off-by: FUJITA Tomonori --- collie/Makefile | 4 + collie/work.c | 250 +++++++++++++++++++++++++++++++++++++++++++++++++----- 2 files changed, 230 insertions(+), 24 deletions(-) diff --git a/collie/Makefile b/collie/Makefile index a3316e1..1e779ad 100644 --- a/collie/Makefile +++ b/collie/Makefile @@ -4,6 +4,10 @@ CFLAGS += -g -O2 -Wall -Wstrict-prototypes -I../include CFLAGS += -D_GNU_SOURCE LIBS += -lpthread -lcrypto -lcpg +ifneq ($(shell test -e /usr/include/linux/signalfd.h && echo 1),) +CFLAGS += -DUSE_SIGNALFD +endif + PROGRAMS = collie COLLIE_OBJS = collie.o net.o vdi.o group.o store.o work.o ../lib/event.o ../lib/net.o ../lib/logger.o COLLIE_DEP = $(COLLIE_OBJS:.o=.d) diff --git a/collie/work.c b/collie/work.c index a0b7fda..885d70a 100644 --- a/collie/work.c +++ b/collie/work.c @@ -28,8 +28,6 @@ #include #include #include -#define _LINUX_FCNTL_H -#include #include "list.h" #include "util.h" @@ -37,8 +35,6 @@ #include "logger.h" #include "event.h" -extern int signalfd(int fd, const sigset_t *mask, int flags); - struct worker_info { pthread_t worker_thread[NR_WORKER_THREAD]; @@ -56,11 +52,139 @@ struct worker_info { int sig_fd; + int command_fd[2]; + int done_fd[2]; + + pthread_cond_t finished_cond; + struct list_head ack_list; + pthread_t ack_thread; + int stop; }; static struct worker_info __wi; +#if defined(__NR_signalfd) && defined(USE_SIGNALFD) + +/* + * workaround for broken linux/signalfd.h including + * usr/include/linux/fcntl.h + */ +#define _LINUX_FCNTL_H + +#include + +static inline int __signalfd(int fd, const sigset_t *mask, int flags) +{ + int fd2, ret; + + fd2 = syscall(__NR_signalfd, fd, mask, _NSIG / 8); + if (fd2 < 0) + return fd2; + + ret = fcntl(fd2, F_GETFL); + if (ret < 0) { + close(fd2); + return -1; + } + + ret = fcntl(fd2, F_SETFL, ret | O_NONBLOCK); + if (ret < 0) { + close(fd2); + return -1; + } + + return fd2; +} +#else +#define __signalfd(fd, mask, flags) (-1) +struct signalfd_siginfo { +}; +#endif + +static void *bs_thread_ack_fn(void *arg) +{ + struct worker_info *wi = arg; + int command, ret, nr; + struct work *work; + +retry: + ret = read(wi->command_fd[0], &command, sizeof(command)); + if (ret < 0) { + eprintf("ack pthread will be dead, %m\n"); + if (errno == EAGAIN || errno == EINTR) + goto retry; + + goto out; + } + + if (wi->stop) + goto out; + + pthread_mutex_lock(&wi->finished_lock); +retest: + if (list_empty(&wi->finished_list)) { + pthread_cond_wait(&wi->finished_cond, &wi->finished_lock); + goto retest; + } + + while (!list_empty(&wi->finished_list)) { + work = list_first_entry(&wi->finished_list, + struct work, w_list); + + list_del(&work->w_list); + list_add_tail(&work->w_list, &wi->ack_list); + } + + pthread_mutex_unlock(&wi->finished_lock); + + nr = 1; +rewrite: + ret = write(wi->done_fd[1], &nr, sizeof(nr)); + if (ret < 0) { + eprintf("can't ack tgtd, %m\n"); + if (errno == EAGAIN || errno == EINTR) + goto rewrite; + + goto out; + } + + goto retry; +out: + pthread_exit(NULL); +} + +static void bs_thread_nonsig_request_done(int fd, int events, void *data) +{ + struct worker_info *wi = data; + struct work *work; + int nr_events, ret; + + ret = read(wi->done_fd[0], &nr_events, sizeof(nr_events)); + if (ret < 0) { + eprintf("wrong wakeup\n"); + return; + } + + while (!list_empty(&wi->ack_list)) { + work = list_first_entry(&wi->ack_list, + struct work, w_list); + + list_del(&work->w_list); + work->done(work, 0); + } + +rewrite: + ret = write(wi->command_fd[1], &nr_events, sizeof(nr_events)); + if (ret < 0) { + eprintf("can't write done, %m\n"); + if (errno == EAGAIN || errno == EINTR) + goto rewrite; + + return; + } +} + static void bs_thread_request_done(int fd, int events, void *data) { int ret; @@ -120,48 +244,94 @@ retest: work->fn(work, idx); - pthread_mutex_lock(&wi->finished_lock); - list_add_tail(&work->w_list, &wi->finished_list); - pthread_mutex_unlock(&wi->finished_lock); + if (wi->sig_fd < 0) { + pthread_mutex_lock(&wi->finished_lock); + list_add_tail(&work->w_list, &wi->finished_list); + pthread_mutex_unlock(&wi->finished_lock); + + pthread_cond_signal(&wi->finished_cond); + } else { + pthread_mutex_lock(&wi->finished_lock); + list_add_tail(&work->w_list, &wi->finished_list); + pthread_mutex_unlock(&wi->finished_lock); - kill(getpid(), SIGUSR2); + kill(getpid(), SIGUSR2); + } } pthread_exit(NULL); } +static int init_signalfd(struct worker_info *wi) +{ + sigset_t mask; + int fd, ret; + + sigemptyset(&mask); + sigaddset(&mask, SIGUSR2); + sigprocmask(SIG_BLOCK, &mask, NULL); + + fd = __signalfd(-1, &mask, 0); + if (fd < 0) { + wi->sig_fd = -1; + return -1; + } + + ret = register_event(fd, bs_thread_request_done, wi); + if (ret) { + eprintf("failed to add epoll event\n"); + close(fd); + wi->sig_fd = -1; + return -1; + } + + wi->sig_fd = fd; + return 0; +} + int init_worker(void) { int i, ret; - sigset_t mask; struct worker_info *wi = &__wi; INIT_LIST_HEAD(&wi->pending_list); INIT_LIST_HEAD(&wi->finished_list); + INIT_LIST_HEAD(&wi->ack_list); + pthread_cond_init(&wi->finished_cond, NULL); pthread_cond_init(&wi->pending_cond, NULL); pthread_mutex_init(&wi->finished_lock, NULL); pthread_mutex_init(&wi->pending_lock, NULL); pthread_mutex_init(&wi->startup_lock, NULL); - sigemptyset(&mask); - sigaddset(&mask, SIGUSR2); - sigprocmask(SIG_BLOCK, &mask, NULL); + ret = init_signalfd(wi); + if (ret) { + ret = pipe(wi->command_fd); + if (ret) { + eprintf("failed to create command pipe, %m\n"); + goto destroy_cond_mutex; + } - wi->sig_fd = signalfd(-1, &mask, 0); - if (wi->sig_fd < 0) { - eprintf("failed to create a signal fd, %m\n"); - return 1; - } + ret = pipe(wi->done_fd); + if (ret) { + eprintf("failed to done command pipe, %m\n"); + goto close_command_fd; + } - ret = fcntl(wi->sig_fd, F_GETFL); - ret = fcntl(wi->sig_fd, F_SETFL, ret | O_NONBLOCK); + ret = register_event(wi->done_fd[0], bs_thread_nonsig_request_done, wi); + if (ret) { + eprintf("failed to add epoll event\n"); + goto close_done_fd; + } - ret = register_event(wi->sig_fd, bs_thread_request_done, wi); - if (ret) { - eprintf("failed to add epoll event\n"); - goto destroy_cond_mutex; + ret = pthread_create(&wi->ack_thread, NULL, bs_thread_ack_fn, wi); + if (ret) { + eprintf("failed to create an ack thread, %s\n", strerror(ret)); + goto event_del; + } + + eprintf("non signalfd mode\n"); } pthread_mutex_lock(&wi->startup_lock); @@ -178,6 +348,16 @@ int init_worker(void) } pthread_mutex_unlock(&wi->startup_lock); + if (wi->sig_fd < 0) { + rewrite: + ret = write(wi->command_fd[1], &ret, sizeof(ret)); + if (ret < 0) { + eprintf("can't write done, %m\n"); + if (errno == EAGAIN || errno == EINTR) + goto rewrite; + } + } + return 0; destroy_threads: @@ -187,8 +367,17 @@ destroy_threads: pthread_join(wi->worker_thread[i - 1], NULL); eprintf("stopped the worker thread %d\n", i - 1); } +event_del: + if (wi->sig_fd < 0) + unregister_event(wi->done_fd[0]); unregister_event(wi->sig_fd); +close_done_fd: + close(wi->done_fd[0]); + close(wi->done_fd[1]); +close_command_fd: + close(wi->command_fd[0]); + close(wi->command_fd[1]); destroy_cond_mutex: pthread_cond_destroy(&wi->pending_cond); pthread_mutex_destroy(&wi->pending_lock); @@ -203,6 +392,11 @@ void exit_worker(void) int i; struct worker_info *wi = &__wi; + if (wi->sig_fd < 0) { + pthread_cancel(wi->ack_thread); + pthread_join(wi->ack_thread, NULL); + } + wi->stop = 1; pthread_cond_broadcast(&wi->pending_cond); @@ -215,7 +409,15 @@ void exit_worker(void) pthread_mutex_destroy(&wi->startup_lock); pthread_mutex_destroy(&wi->finished_lock); - unregister_event(wi->sig_fd); + if (wi->sig_fd < 0) { + pthread_cond_destroy(&wi->finished_cond); + + close(wi->done_fd[0]); + close(wi->done_fd[1]); + close(wi->command_fd[0]); + close(wi->command_fd[1]); + } else + unregister_event(wi->sig_fd); wi->stop = 0; } -- 1.5.6.5