Re: [PATCH -mm 4/4][AIO] - Listio support

From: Bharata B Rao
Date: Mon Nov 27 2006 - 08:39:58 EST


On 11/20/06, Sébastien Dugué <sebastien.dugue@xxxxxxxx> wrote:

POSIX listio support

This patch adds POSIX listio completion notification support. It builds
on support provided by the aio signal notification patch and adds an
IOCB_CMD_GROUP command to io_submit().


I have altered this patch to provide the listio support through a
separate syscall.

The code is in RFC stage at the moment and is only compile-tested on
i386. I have refrained from modifying unistd.h for now in order to
avoid touching too many files. I plan to do actual tests (which
involves user space changes to aio libraries) only if there is an
agreement with this syscall approach.

More details about the interface are in the patch itself.

Regards,
Bharata.

This patch adds POSIX listio completion notification support. It builds
on support provided by the aio signal notification patch and adds a
system call lio_submit().

lio_submit() is similar to io_submit() except that it takes two more
argurments: mode and sigevent.

mode can be LIO_WAIT or LIO_NOWAIT. sigevent argument specificies
the sigevent notification mechanism on listio completion. lio_submit()
waits within the syscall if LIO_WAIT is specified.

A struct lio_event is added in include/linux/aio.h

A struct lio_event *ki_lio is added to struct iocb in include/linux/aio.h

In lio_submit(), a lio_event is created in lio_create() which contains the
necessary information for signaling a thread (signal number, pid, notify type
and value) along with a count of requests attached to this event.

The following depicts the lio_event structure:

struct lio_event {
atomic_t lio_users;
struct aio_notify lio_notify;
};

lio_users holds an atomic counter of the number of requests attached to this
lio. It is incremented with each request submitted and decremented at each
request completion. When the counter reaches 0, we send the notification.

In aio_complete(), if the request is attached to an lio (ki_lio <> 0),
then lio_check() is called to decrement the lio_users count and eventually
signal the user process when all the requests in the group have completed.

Sebastien Dugue's listio patch has been modified to arrive at this patch.

Signed-off-by: Sebastien Dugue <sebastien.dugue@xxxxxxxx>
Signed-off-by: Laurent Vivier <laurent.vivier@xxxxxxxx>
Signed-off-by: Bharata B Rao <bharata.rao@xxxxxxxxx>
---

fs/aio.c | 181 +++++++++++++++++++++++++++++++++++++++++------
fs/compat.c | 29 +++++++
include/linux/aio.h | 13 ++-
include/linux/aio_abi.h | 5 +
include/linux/syscalls.h | 2
5 files changed, 204 insertions(+), 26 deletions(-)

diff -puN fs/aio.c~aio-listio-support fs/aio.c
--- linux-2.6.19-rc5-mm2/fs/aio.c~aio-listio-support 2006-11-23 19:05:33.000000000 +0530
+++ linux-2.6.19-rc5-mm2-bharata/fs/aio.c 2006-11-25 11:24:55.000000000 +0530
@@ -413,6 +413,7 @@ static struct kiocb fastcall *__aio_get_
req->ki_ctx = ctx;
req->ki_cancel = NULL;
req->ki_retry = NULL;
+ req->ki_lio = NULL;
req->ki_dtor = NULL;
req->private = NULL;
req->ki_iovec = NULL;
@@ -1010,6 +1011,59 @@ out_unlock:
return -EINVAL;
}

+static inline void lio_check(struct lio_event *lio)
+{
+ int ret;
+
+ ret = atomic_dec_and_test(&lio->lio_users);
+
+ if (unlikely(ret) && lio->lio_notify.notify != SIGEV_NONE) {
+ /* last one -> notify process */
+ if (aio_send_signal(&lio->lio_notify))
+ sigqueue_free(lio->lio_notify.sigq);
+ kfree(lio);
+ }
+}
+
+static struct lio_event *lio_create(struct sigevent __user *user_event,
+ int mode)
+{
+ int ret = 0;
+ struct lio_event *lio = NULL;
+
+ if (unlikely((mode == LIO_NOWAIT) && !user_event))
+ return lio;
+
+ lio = kzalloc(sizeof(*lio), GFP_KERNEL);
+
+ if (!lio)
+ return ERR_PTR(-EAGAIN);
+
+ /*
+ * Grab an initial ref on the lio to avoid races between
+ * submission and completion.
+ */
+ atomic_set(&lio->lio_users, 1);
+
+ lio->lio_notify.notify = SIGEV_NONE;
+
+ /* sigevent argument is ignored with LIO_WAIT */
+ if (user_event && (mode == LIO_NOWAIT)) {
+ /*
+ * User specified an event for this lio,
+ * he wants to be notified upon lio completion.
+ */
+ ret = aio_setup_sigevent(&lio->lio_notify, user_event);
+
+ if (ret) {
+ kfree(lio);
+ return ERR_PTR(ret);
+ }
+ }
+
+ return lio;
+}
+
/* aio_complete
* Called when the io request on the given iocb is complete.
* Returns true if this is the last user of the request. The
@@ -1058,6 +1112,8 @@ int fastcall aio_complete(struct kiocb *
* when the event got cancelled.
*/
if (kiocbIsCancelled(iocb)) {
+ if (iocb->ki_lio)
+ lio_check(iocb->ki_lio);
if (iocb->ki_notify.sigq)
sigqueue_free(iocb->ki_notify.sigq);
goto put_rq;
@@ -1100,6 +1156,14 @@ int fastcall aio_complete(struct kiocb *
sigqueue_free(iocb->ki_notify.sigq);
}

+ /*
+ * In case of listio, in addition to the optional per-iocb sigevent
+ * notification (as above), the listio as a whole can also generate
+ * a sigevent notification.
+ */
+ if (iocb->ki_lio)
+ lio_check(iocb->ki_lio);
+
pr_debug("%ld retries: %zd of %zd\n", iocb->ki_retried,
iocb->ki_nbytes - iocb->ki_left, iocb->ki_nbytes);
put_rq:
@@ -1634,7 +1698,7 @@ static int aio_wake_function(wait_queue_
}

int fastcall io_submit_one(struct kioctx *ctx, struct iocb __user *user_iocb,
- struct iocb *iocb)
+ struct iocb *iocb, struct lio_event *lio)
{
struct kiocb *req;
struct file *file;
@@ -1690,12 +1754,13 @@ int fastcall io_submit_one(struct kioctx

if (iocb->aio_sigeventp) {
ret = aio_setup_sigevent(&req->ki_notify,
- (struct sigevent __user *)(unsigned long)
- iocb->aio_sigeventp);
+ (struct sigevent __user *)(unsigned long)
+ iocb->aio_sigeventp);
if (ret)
goto out_put_req;
}

+ req->ki_lio = lio;
ret = aio_setup_iocb(req);

if (ret)
@@ -1723,6 +1788,48 @@ out_put_req:
return ret;
}

+static int io_submit_group(struct kioctx *ctx, long nr,
+ struct iocb __user * __user *iocbpp, struct lio_event *lio)
+{
+ int i;
+ long ret = 0;
+
+ /*
+ * AKPM: should this return a partial result if some of the IOs were
+ * successfully submitted?
+ */
+ for (i = 0; i < nr; i++) {
+ struct iocb __user *user_iocb;
+ struct iocb tmp;
+
+ if (unlikely(__get_user(user_iocb, iocbpp + i))) {
+ ret = -EFAULT;
+ break;
+ }
+
+ if (unlikely(copy_from_user(&tmp, user_iocb, sizeof(tmp)))) {
+ ret = -EFAULT;
+ break;
+ }
+
+ if (lio)
+ atomic_inc(&lio->lio_users);
+
+ ret = io_submit_one(ctx, user_iocb, &tmp, lio);
+ if (ret) {
+ if (lio) {
+ /*
+ * In case of listio, continue with
+ * the subsequent requests
+ */
+ atomic_dec(&lio->lio_users);
+ } else
+ break;
+ }
+ }
+ return i ? i : ret;
+}
+
/* sys_io_submit:
* Queue the nr iocbs pointed to by iocbpp for processing. Returns
* the number of iocbs queued. May return -EINVAL if the aio_context
@@ -1740,7 +1847,6 @@ asmlinkage long sys_io_submit(aio_contex
{
struct kioctx *ctx;
long ret = 0;
- int i;

if (unlikely(nr < 0))
return -EINVAL;
@@ -1754,31 +1860,60 @@ asmlinkage long sys_io_submit(aio_contex
return -EINVAL;
}

- /*
- * AKPM: should this return a partial result if some of the IOs were
- * successfully submitted?
- */
- for (i=0; i<nr; i++) {
- struct iocb __user *user_iocb;
- struct iocb tmp;
+ ret = io_submit_group(ctx, nr, iocbpp, NULL);

- if (unlikely(__get_user(user_iocb, iocbpp + i))) {
- ret = -EFAULT;
- break;
- }
+ put_ioctx(ctx);
+ return ret;
+}

- if (unlikely(copy_from_user(&tmp, user_iocb, sizeof(tmp)))) {
- ret = -EFAULT;
- break;
- }
+asmlinkage long sys_lio_submit(aio_context_t ctx_id, int mode, long nr,
+ struct iocb __user * __user *iocbpp, struct sigevent __user *event)
+{
+ struct kioctx *ctx;
+ struct lio_event *lio = NULL;
+ long ret = 0;

- ret = io_submit_one(ctx, user_iocb, &tmp);
- if (ret)
- break;
+ if (unlikely(nr < 0))
+ return -EINVAL;
+
+ if (unlikely(!access_ok(VERIFY_READ, iocbpp, (nr*sizeof(*iocbpp)))))
+ return -EFAULT;
+
+ ctx = lookup_ioctx(ctx_id);
+ if (unlikely(!ctx)) {
+ pr_debug("EINVAL: lio_submit: invalid context id\n");
+ return -EINVAL;
+ }
+
+ lio = lio_create(event, mode);
+
+ ret = PTR_ERR(lio);
+ if (IS_ERR(lio))
+ goto out_put_ctx;
+
+ ret = io_submit_group(ctx, nr, iocbpp, lio);
+
+ /* If we failed to submit even one request just return */
+ if (ret < 0 ) {
+ if (lio)
+ kfree(lio);
+ goto out_put_ctx;
+ }
+
+ /*
+ * Drop extra ref on the lio now that we're done submitting requests.
+ */
+ if (lio)
+ lio_check(lio);
+
+ if (mode == LIO_WAIT) {
+ wait_event(ctx->wait, atomic_read(&lio->lio_users) == 0);
+ kfree(lio);
}

+out_put_ctx:
put_ioctx(ctx);
- return i ? i : ret;
+ return ret;
}

/* lookup_kiocb
diff -puN include/linux/aio_abi.h~aio-listio-support include/linux/aio_abi.h
--- linux-2.6.19-rc5-mm2/include/linux/aio_abi.h~aio-listio-support 2006-11-23 21:56:14.000000000 +0530
+++ linux-2.6.19-rc5-mm2-bharata/include/linux/aio_abi.h 2006-11-23 21:56:55.000000000 +0530
@@ -45,6 +45,11 @@ enum {
IOCB_CMD_PWRITEV = 8,
};

+enum {
+ LIO_WAIT = 0,
+ LIO_NOWAIT = 1,
+};
+
/* read() from /dev/aio returns these structures. */
struct io_event {
__u64 data; /* the data field from the iocb */
diff -puN include/linux/aio.h~aio-listio-support include/linux/aio.h
--- linux-2.6.19-rc5-mm2/include/linux/aio.h~aio-listio-support 2006-11-23 21:57:23.000000000 +0530
+++ linux-2.6.19-rc5-mm2-bharata/include/linux/aio.h 2006-11-23 22:01:11.000000000 +0530
@@ -58,6 +58,11 @@ struct aio_notify {
struct sigqueue *sigq;
};

+struct lio_event {
+ atomic_t lio_users;
+ struct aio_notify lio_notify;
+};
+
/* is there a better place to document function pointer methods? */
/**
* ki_retry - iocb forward progress callback
@@ -112,7 +117,8 @@ struct kiocb {
__u64 ki_user_data; /* user's data for completion */
wait_queue_t ki_wait;
loff_t ki_pos;
-
+ /* lio this iocb might be attached to */
+ struct lio_event *ki_lio;
void *private;
/* State that we remember to be able to restart/retry */
unsigned short ki_opcode;
@@ -220,12 +226,13 @@ struct mm_struct;
extern void FASTCALL(exit_aio(struct mm_struct *mm));
extern struct kioctx *lookup_ioctx(unsigned long ctx_id);
extern int FASTCALL(io_submit_one(struct kioctx *ctx,
- struct iocb __user *user_iocb, struct iocb *iocb));
+ struct iocb __user *user_iocb, struct iocb *iocb,
+ struct lio_event *lio));

/* semi private, but used by the 32bit emulations: */
struct kioctx *lookup_ioctx(unsigned long ctx_id);
int FASTCALL(io_submit_one(struct kioctx *ctx, struct iocb __user *user_iocb,
- struct iocb *iocb));
+ struct iocb *iocb, struct lio_event *lio));

#define get_ioctx(kioctx) do { \
BUG_ON(atomic_read(&(kioctx)->users) <= 0); \
diff -puN fs/compat.c~aio-listio-support fs/compat.c
--- linux-2.6.19-rc5-mm2/fs/compat.c~aio-listio-support 2006-11-24 08:52:42.000000000 +0530
+++ linux-2.6.19-rc5-mm2-bharata/fs/compat.c 2006-11-24 09:53:56.000000000 +0530
@@ -678,6 +678,35 @@ compat_sys_io_submit(aio_context_t ctx_i
return ret;
}

+asmlinkage long
+compat_sys_lio_submit(aio_context_t ctx_id, int mode, int nr, u32 __user *iocb,
+ struct compat_sigevent __user *sig_user)
+{
+ struct iocb __user * __user *iocb64;
+ struct sigvent __user *event = NULL;
+ long ret;
+
+ if (unlikely(nr < 0))
+ return -EINVAL;
+
+ if (nr > MAX_AIO_SUBMITS)
+ nr = MAX_AIO_SUBMITS;
+
+ iocb64 = compat_alloc_user_space(nr * sizeof(*iocb64));
+ ret = copy_iocb(nr, iocb, iocb64);
+ if (ret)
+ return ret;
+
+ if (sig_user) {
+ struct sigevent kevent;
+ event = compat_alloc_user_space(sizeof(struct sigevent));
+ if (get_compat_sigevent(&kevent, sig_user) ||
+ copy_to_user(event, &kevent, sizeof(struct sigevent)))
+ return -EFAULT;
+ }
+ return sys_lio_submit(ctx_id, mode, nr, iocb64, event);
+}
+
struct compat_ncp_mount_data {
compat_int_t version;
compat_uint_t ncp_fd;
diff -puN include/linux/syscalls.h~aio-listio-support include/linux/syscalls.h
--- linux-2.6.19-rc5-mm2/include/linux/syscalls.h~aio-listio-support 2006-11-24 09:42:44.000000000 +0530
+++ linux-2.6.19-rc5-mm2-bharata/include/linux/syscalls.h 2006-11-24 09:43:42.000000000 +0530
@@ -319,6 +319,8 @@ asmlinkage long sys_io_getevents(aio_con
struct timespec __user *timeout);
asmlinkage long sys_io_submit(aio_context_t, long,
struct iocb __user * __user *);
+asmlinkage long sys_lio_submit(aio_context_t, int, long,
+ struct iocb __user * __user *, struct sigevent __user *);
asmlinkage long sys_io_cancel(aio_context_t ctx_id, struct iocb __user *iocb,
struct io_event __user *result);
asmlinkage ssize_t sys_sendfile(int out_fd, int in_fd,
_