From: Zach Brown, Jeff Moyer, Suparna Bhattacharya Previously there have been (complicated and scary) attempts to funnel individual aio events down epoll or vice versa. This instead lets one issue an entire sys_epoll_wait() as an aio op. You'd setup epoll as usual and then issue epoll_wait aio ops which would complete once epoll events had been copied. This will enable a single io_getevents() event loop to process both disk AIO and epoll notifications. Thanks to Arjan Van de Van for helping figure out how to resolve the lockdep complaints. Both ctx->lock and ep->lock can be held in certain wait queue callback routines, thus being nested inside q->lock. However, this excludes ctx->wait or ep->wq wait queues, which can safetly be nested inside ctx->lock or ep->lock respectively. So we teach lockdep to recognize these as distinct classes. Signed-off-by: Zach Brown Signed-off-by: Jeff Moyer Signed-off-by: Suparna Bhattacharya --- linux-2.6.19-rc4-aio-root/include/linux/aio.h | 2 linux-2.6.19-rc4-aio-root/include/linux/aio_abi.h | 1 linux-2.6.19-rc4-aio-root/include/linux/sched.h | 2 --- linux-2.6.20-rc1-root/fs/aio.c | 54 +++++++++++++ linux-2.6.20-rc1-root/fs/eventpoll.c | 95 +++++++++++++++++++++--- linux-2.6.20-rc1-root/include/linux/aio.h | 2 linux-2.6.20-rc1-root/include/linux/aio_abi.h | 1 linux-2.6.20-rc1-root/include/linux/eventpoll.h | 31 +++++++ linux-2.6.20-rc1-root/include/linux/sched.h | 2 linux-2.6.20-rc1-root/kernel/timer.c | 21 +++++ 7 files changed, 196 insertions(+), 10 deletions(-) diff -puN fs/aio.c~aio-epoll-wait fs/aio.c --- linux-2.6.20-rc1/fs/aio.c~aio-epoll-wait 2006-12-21 08:52:05.000000000 +0530 +++ linux-2.6.20-rc1-root/fs/aio.c 2006-12-21 08:52:06.000000000 +0530 @@ -30,6 +30,7 @@ #include #include #include +#include #include #include @@ -193,6 +194,8 @@ static int aio_setup_ring(struct kioctx kunmap_atomic((void *)((unsigned long)__event & PAGE_MASK), km); \ } while(0) +static struct lock_class_key ioctx_wait_queue_head_lock_key; + /* ioctx_alloc * Allocates and initializes an ioctx. Returns an ERR_PTR if it failed. */ @@ -224,6 +227,8 @@ static struct kioctx *ioctx_alloc(unsign spin_lock_init(&ctx->ctx_lock); spin_lock_init(&ctx->ring_info.ring_lock); init_waitqueue_head(&ctx->wait); + /* Teach lockdep to recognize this lock as a different class */ + lockdep_set_class(&ctx->wait.lock, &ioctx_wait_queue_head_lock_key); INIT_LIST_HEAD(&ctx->active_reqs); INIT_LIST_HEAD(&ctx->run_list); @@ -1401,6 +1406,42 @@ static ssize_t aio_setup_single_vector(s return 0; } +/* Uses iocb->ki_private */ +void aio_free_iocb_timer(struct kiocb *iocb) +{ + struct timer_list *timer = (struct timer_list *)iocb->private; + + if (timer) { + del_timer(timer); + kfree(timer); + iocb->private = NULL; + } +} + +/* Uses iocb->private */ +int aio_setup_iocb_timer(struct kiocb *iocb, unsigned long expires, + void (*function)(unsigned long)) +{ + struct timer_list *timer; + + if (iocb->private) + return -EEXIST; + + timer = kmalloc(sizeof(struct timer_list), GFP_KERNEL); + if (!timer) + return -ENOMEM; + + init_timer(timer); + timer->function = function; + timer->data = (unsigned long)iocb; + timer->expires = expires; + + iocb->private = timer; + iocb->ki_dtor = aio_free_iocb_timer; + return 0; +} + + /* * aio_setup_iocb: * Performs the initial checks and aio retry method @@ -1486,6 +1527,19 @@ static ssize_t aio_setup_iocb(struct kio if (file->f_op->aio_fsync) kiocb->ki_retry = aio_fsync; break; + case IOCB_CMD_EPOLL_WAIT: + /* + * Note that we unconditionally allocate a timer, but we + * only use it if a timeout was specified. Otherwise, it + * is just a holder for the "infinite" value. + */ + ret = aio_setup_iocb_timer(kiocb, ep_relative_ms_to_jiffies( + kiocb->ki_pos), eventpoll_aio_timer); + if (unlikely(ret)) + break; + kiocb->ki_retry = eventpoll_aio_wait; + kiocb->ki_cancel = eventpoll_aio_cancel; + break; default: dprintk("EINVAL: io_submit: no operation provided\n"); ret = -EINVAL; diff -puN fs/eventpoll.c~aio-epoll-wait fs/eventpoll.c --- linux-2.6.20-rc1/fs/eventpoll.c~aio-epoll-wait 2006-12-21 08:52:05.000000000 +0530 +++ linux-2.6.20-rc1-root/fs/eventpoll.c 2006-12-21 08:52:06.000000000 +0530 @@ -35,6 +35,7 @@ #include #include #include +#include #include #include #include @@ -642,6 +643,75 @@ eexit_1: return error; } +/* + * Called when the eventpoll timer expires or a cancellation + * occurs for an aio_epoll_wait. It is enough for this function to + * trigger a wakeup on the eventpoll waitqueue. The aio_wake_function() + * callback will pull out the wait queue entry and kick the iocb so that + * the rest gets taken care of in aio_run_iocb->aio_epoll_wait which + * can recognize the cancelled state or timeout expiration and do + * the right thing. + */ +void eventpoll_aio_timer(unsigned long data) +{ + struct kiocb *iocb = (struct kiocb *)data; + struct timer_list *timer = iocb_timer(iocb); + struct file *file = iocb->ki_filp; + struct eventpoll *ep = (struct eventpoll *)file->private_data; + unsigned long flags; + + if (timer) + del_timer(timer); + write_lock_irqsave(&ep->lock, flags); + /* because ep->lock also protects ep->wq */ + __wake_up_locked(&ep->wq, TASK_UNINTERRUPTIBLE | TASK_INTERRUPTIBLE); + write_unlock_irqrestore(&ep->lock, flags); +} + + +int eventpoll_aio_cancel(struct kiocb *iocb, struct io_event *event) +{ + /* to wakeup the iocb, so actual cancellation happens aio_run_iocb */ + eventpoll_aio_timer((unsigned long)iocb); + + event->res = event->res2 = 0; + /* drop the cancel reference */ + aio_put_req(iocb); + + return 0; +} + +/* + * iocb->ki_nbytes -- number of events + * iocb->ki_pos -- relative timeout in milliseconds + * iocb->private -- timer with absolute timeout in jiffies + */ +ssize_t eventpoll_aio_wait(struct kiocb *iocb) +{ + struct file *file = iocb->ki_filp; + ssize_t ret = -EINVAL; + unsigned long expires; + struct timer_list *timer = iocb_timer(iocb); + + if (!is_file_epoll(file) || iocb->ki_nbytes > EP_MAX_EVENTS || + iocb->ki_nbytes <= 0) + return -EINVAL; + + expires = timer->expires; + ret = ep_poll(file->private_data, + (struct epoll_event __user *)iocb->ki_buf, + iocb->ki_nbytes, ep_jiffies_to_relative_ms(expires)); + + /* + * If a timeout was specified, ep_poll returned retry, and we have + * not yet registered a timer, go ahead and register one. + */ + if (ret == -EIOCBRETRY) { + mod_timer(timer, expires); + } + + return ret; +} /* * Implement the event wait interface for the eventpoll file. It is the kernel @@ -824,6 +894,7 @@ eexit_1: return error; } +static struct lock_class_key eventpoll_wait_queue_head_lock_key; static int ep_alloc(struct eventpoll **pep) { @@ -835,6 +906,9 @@ static int ep_alloc(struct eventpoll **p rwlock_init(&ep->lock); init_rwsem(&ep->sem); init_waitqueue_head(&ep->wq); + /* Teach lockdep to recognize this lock as a different class */ + lockdep_set_class(&ep->wq.lock, &eventpoll_wait_queue_head_lock_key); + init_waitqueue_head(&ep->poll_wait); INIT_LIST_HEAD(&ep->rdllist); ep->rbr = RB_ROOT; @@ -1549,7 +1623,7 @@ static int ep_poll(struct eventpoll *ep, int res, eavail; unsigned long flags; long jtimeout; - wait_queue_t wait; + wait_queue_t *wait = current->io_wait; /* * Calculate the timeout by checking for the "infinite" value ( -1 ) @@ -1569,16 +1643,13 @@ retry: * We need to sleep here, and we will be wake up by * ep_poll_callback() when events will become available. */ - init_waitqueue_entry(&wait, current); - __add_wait_queue(&ep->wq, &wait); - for (;;) { /* * We don't want to sleep if the ep_poll_callback() sends us * a wakeup in between. That's why we set the task state * to TASK_INTERRUPTIBLE before doing the checks. */ - set_current_state(TASK_INTERRUPTIBLE); + prepare_to_wait(&ep->wq, wait, TASK_INTERRUPTIBLE); if (!list_empty(&ep->rdllist) || !jtimeout) break; if (signal_pending(current)) { @@ -1587,12 +1658,16 @@ retry: } write_unlock_irqrestore(&ep->lock, flags); - jtimeout = schedule_timeout(jtimeout); + if ((jtimeout = schedule_timeout_wait(jtimeout, wait)) + < 0) { + if ((res = jtimeout) == -EIOCBRETRY) + goto out; + } + if (res < 0) + break; write_lock_irqsave(&ep->lock, flags); } - __remove_wait_queue(&ep->wq, &wait); - - set_current_state(TASK_RUNNING); + finish_wait(&ep->wq, wait); } /* Is it worth to try to dig for events ? */ @@ -1608,7 +1683,7 @@ retry: if (!res && eavail && !(res = ep_events_transfer(ep, events, maxevents)) && jtimeout) goto retry; - +out: return res; } diff -puN include/linux/aio_abi.h~aio-epoll-wait include/linux/aio_abi.h --- linux-2.6.20-rc1/include/linux/aio_abi.h~aio-epoll-wait 2006-12-21 08:52:05.000000000 +0530 +++ linux-2.6.20-rc1-root/include/linux/aio_abi.h 2006-12-21 08:52:06.000000000 +0530 @@ -43,6 +43,7 @@ enum { IOCB_CMD_NOOP = 6, IOCB_CMD_PREADV = 7, IOCB_CMD_PWRITEV = 8, + IOCB_CMD_EPOLL_WAIT = 9, }; /* read() from /dev/aio returns these structures. */ diff -puN include/linux/aio.h~aio-epoll-wait include/linux/aio.h --- linux-2.6.20-rc1/include/linux/aio.h~aio-epoll-wait 2006-12-21 08:52:06.000000000 +0530 +++ linux-2.6.20-rc1-root/include/linux/aio.h 2006-12-21 08:52:06.000000000 +0530 @@ -244,6 +244,8 @@ do { \ #define io_wait_to_kiocb(io_wait) container_of(container_of(io_wait, \ struct wait_bit_queue, wait), struct kiocb, ki_wait) +#define iocb_timer(iocb) ((struct timer_list *)((iocb)->private)) + #include diff -puN include/linux/eventpoll.h~aio-epoll-wait include/linux/eventpoll.h --- linux-2.6.20-rc1/include/linux/eventpoll.h~aio-epoll-wait 2006-12-21 08:52:06.000000000 +0530 +++ linux-2.6.20-rc1-root/include/linux/eventpoll.h 2006-12-21 08:52:06.000000000 +0530 @@ -48,6 +48,33 @@ struct epoll_event { /* Forward declarations to avoid compiler errors */ struct file; +/* Maximum msec timeout value storeable in a long int */ +#define EP_MAX_MSTIMEO min(1000ULL * MAX_SCHEDULE_TIMEOUT / HZ, (LONG_MAX - 999ULL) / HZ) + +static inline int ep_jiffies_to_relative_ms(unsigned long expires) +{ + int relative_ms = 0; + unsigned long now = jiffies; + + if (expires == MAX_SCHEDULE_TIMEOUT) + relative_ms = EP_MAX_MSTIMEO; + else if (time_before(now, expires)) + relative_ms = jiffies_to_msecs(expires - now); + + return relative_ms; +} + +static inline unsigned long ep_relative_ms_to_jiffies(int relative_ms) +{ + unsigned long expires; + + if (relative_ms < 0 || relative_ms >= EP_MAX_MSTIMEO) + expires = MAX_SCHEDULE_TIMEOUT; + else + expires = jiffies + msecs_to_jiffies(relative_ms); + return expires; +} + #ifdef CONFIG_EPOLL @@ -90,6 +117,10 @@ static inline void eventpoll_release(str eventpoll_release_file(file); } +extern void eventpoll_aio_timer(unsigned long data); +extern int eventpoll_aio_cancel(struct kiocb *iocb, struct io_event *event); +extern ssize_t eventpoll_aio_wait(struct kiocb *iocb); + #else static inline void eventpoll_init_file(struct file *file) {} diff -puN include/linux/sched.h~aio-epoll-wait include/linux/sched.h --- linux-2.6.20-rc1/include/linux/sched.h~aio-epoll-wait 2006-12-21 08:52:06.000000000 +0530 +++ linux-2.6.20-rc1-root/include/linux/sched.h 2006-12-21 08:52:06.000000000 +0530 @@ -247,6 +247,8 @@ extern int in_sched_functions(unsigned l #define MAX_SCHEDULE_TIMEOUT LONG_MAX extern signed long FASTCALL(schedule_timeout(signed long timeout)); +extern signed long FASTCALL(schedule_timeout_wait(signed long timeout, + wait_queue_t *wait)); extern signed long schedule_timeout_interruptible(signed long timeout); extern signed long schedule_timeout_uninterruptible(signed long timeout); asmlinkage void schedule(void); diff -puN kernel/timer.c~aio-epoll-wait kernel/timer.c --- linux-2.6.20-rc1/kernel/timer.c~aio-epoll-wait 2006-12-21 08:52:06.000000000 +0530 +++ linux-2.6.20-rc1-root/kernel/timer.c 2006-12-21 08:52:06.000000000 +0530 @@ -1369,6 +1369,27 @@ fastcall signed long __sched schedule_ti EXPORT_SYMBOL(schedule_timeout); /* + * Same as schedule_timeout, except that it checks the wait queue context + * passed in, and in case of an asynchronous waiter it does not sleep, + * but returns -EIOCBRETRY to allow the operation to be retried later when + * notified, unless it has been cancelled in which case it returns -EINTR + */ +fastcall signed long __sched schedule_timeout_wait(signed long timeout, + wait_queue_t *wait) +{ + struct kiocb *iocb; + if (is_sync_wait(wait)) + return schedule_timeout(timeout); + + iocb = io_wait_to_kiocb(wait); + if (kiocbIsCancelled(iocb)) + return -EINTR; + + return -EIOCBRETRY; +} + + +/* * We can use __set_current_state() here because schedule_timeout() calls * schedule() unconditionally. */ _