[take37 10/10] kevent: Kevent based generic AIO.

From: Evgeniy Polyakov
Date: Thu Feb 22 2007 - 08:58:46 EST



Kevent based generic AIO.

This patch only implements network AIO, which is _COMPLETELY_
impossible and broken in _ANY_ micro-thread design. For details
and test consider following link:
http://tservice.net.ru/~s0mbre/blog/2007/02/10#2007_02_10

Designing AIO without network in mind can only be result of heavy hang-over.

Kevent AIO is implemented as state machine.
There is a patch which implements async open/send_header/sendfile/close.

aio_sendfile()/aio_sendfile_path() contains of two major parts: AIO
state machine and page processing code.
The former is just a small subsystem, which allows to queue callback
for theirs invocation in process' context on behalf of pool of kernel
threads. It allows to queue caches of callbacks to the local thread
or to any other specified. Each cache of callbacks is processed until
there are callbacks in it, callbacks can requeue themselfs into the
same cache.

Real work is being done in page processing code - code which populates
pages into VFS cache and then sends pages to the destination socket
via ->sendpage(). Unlike previous aio_sendfile() implementation, new
one does not require low-level filesystem specific callbacks (->get_block())
at all, instead I extended struct address_space_operations to contain new
member called ->aio_readpages(), which is exactly the same as ->readpage()
(read: mpage_readpages()) except different BIO allocation and sumbission
routines. I changed mpage_readpages() to provide mpage_alloc() and
mpage_bio_submit() to the new function called __mpage_readpages(), which is
exactly old mpage_readpages() with provided callback invocation instead of
usage for old functions. mpage_readpages_aio() provides kevent specific
callbacks, which calls old functions, but with different destructor callbacks,
which are essentially the same, except that they reschedule AIO processing.

aio_sendfile_path() is essentially aio_sendfile(), except that it takes
source filename as parameter, has a pointer to private header
and its size (which allows to send header and file's content in one syscall
instead of three (open, send, sendfile) and returns opened file descriptor.

Benchmark of the 100 1MB files transfer (files are in VFS already) using sync
sendfile() against aio_sendfile_path() shows about 10MB/sec performance win
(78 MB/s vs 66-72 MB/s over 1 Gb network, sendfile sending server is one-way
AMD Athlong 64 3500+) for aio_sendfile_path().

AIO state machine is a base for network AIO (which becomes
quite trivial), but I will not start implementation until
roadback of kevent as a whole and AIO implementation become more clear.

Signed-off-by: Evgeniy Polyakov <johnpol@xxxxxxxxxxx>

diff --git a/fs/bio.c b/fs/bio.c
index 7618bcb..291e7e8 100644
--- a/fs/bio.c
+++ b/fs/bio.c
@@ -120,7 +120,7 @@ void bio_free(struct bio *bio, struct bio_set *bio_set)
/*
* default destructor for a bio allocated with bio_alloc_bioset()
*/
-static void bio_fs_destructor(struct bio *bio)
+void bio_fs_destructor(struct bio *bio)
{
bio_free(bio, fs_bio_set);
}
diff --git a/fs/ext3/inode.c b/fs/ext3/inode.c
index 8a824f4..910dfa6 100644
--- a/fs/ext3/inode.c
+++ b/fs/ext3/inode.c
@@ -1650,6 +1650,13 @@ ext3_readpages(struct file *file, struct address_space *mapping,
return mpage_readpages(mapping, pages, nr_pages, ext3_get_block);
}

+static int
+ext3_readpages_aio(struct file *file, struct address_space *mapping,
+ struct list_head *pages, unsigned nr_pages, void *priv)
+{
+ return mpage_readpages_aio(mapping, pages, nr_pages, ext3_get_block, priv);
+}
+
static void ext3_invalidatepage(struct page *page, unsigned long offset)
{
journal_t *journal = EXT3_JOURNAL(page->mapping->host);
@@ -1768,6 +1775,7 @@ static int ext3_journalled_set_page_dirty(struct page *page)
}

static const struct address_space_operations ext3_ordered_aops = {
+ .aio_readpages = ext3_readpages_aio,
.readpage = ext3_readpage,
.readpages = ext3_readpages,
.writepage = ext3_ordered_writepage,
diff --git a/fs/mpage.c b/fs/mpage.c
index 692a3e5..e5ba44b 100644
--- a/fs/mpage.c
+++ b/fs/mpage.c
@@ -102,7 +102,7 @@ static struct bio *mpage_bio_submit(int rw, struct bio *bio)
static struct bio *
mpage_alloc(struct block_device *bdev,
sector_t first_sector, int nr_vecs,
- gfp_t gfp_flags)
+ gfp_t gfp_flags, void *priv)
{
struct bio *bio;

@@ -116,6 +116,7 @@ mpage_alloc(struct block_device *bdev,
if (bio) {
bio->bi_bdev = bdev;
bio->bi_sector = first_sector;
+ bio->bi_private = priv;
}
return bio;
}
@@ -175,7 +176,10 @@ map_buffer_to_page(struct page *page, struct buffer_head *bh, int page_block)
static struct bio *
do_mpage_readpage(struct bio *bio, struct page *page, unsigned nr_pages,
sector_t *last_block_in_bio, struct buffer_head *map_bh,
- unsigned long *first_logical_block, get_block_t get_block)
+ unsigned long *first_logical_block, get_block_t get_block,
+ struct bio *(*alloc)(struct block_device *bdev, sector_t first_sector,
+ int nr_vecs, gfp_t gfp_flags, void *priv),
+ struct bio *(*submit)(int rw, struct bio *bio), void *priv)
{
struct inode *inode = page->mapping->host;
const unsigned blkbits = inode->i_blkbits;
@@ -302,25 +306,25 @@ do_mpage_readpage(struct bio *bio, struct page *page, unsigned nr_pages,
* This page will go to BIO. Do we need to send this BIO off first?
*/
if (bio && (*last_block_in_bio != blocks[0] - 1))
- bio = mpage_bio_submit(READ, bio);
+ bio = submit(READ, bio);

alloc_new:
if (bio == NULL) {
- bio = mpage_alloc(bdev, blocks[0] << (blkbits - 9),
+ bio = alloc(bdev, blocks[0] << (blkbits - 9),
min_t(int, nr_pages, bio_get_nr_vecs(bdev)),
- GFP_KERNEL);
+ GFP_KERNEL, priv);
if (bio == NULL)
goto confused;
}

length = first_hole << blkbits;
if (bio_add_page(bio, page, length, 0) < length) {
- bio = mpage_bio_submit(READ, bio);
+ bio = submit(READ, bio);
goto alloc_new;
}

if (buffer_boundary(map_bh) || (first_hole != blocks_per_page))
- bio = mpage_bio_submit(READ, bio);
+ bio = submit(READ, bio);
else
*last_block_in_bio = blocks[blocks_per_page - 1];
out:
@@ -328,7 +332,7 @@ out:

confused:
if (bio)
- bio = mpage_bio_submit(READ, bio);
+ bio = submit(READ, bio);
if (!PageUptodate(page))
block_read_full_page(page, get_block);
else
@@ -336,6 +340,48 @@ confused:
goto out;
}

+int
+__mpage_readpages(struct address_space *mapping, struct list_head *pages,
+ unsigned nr_pages, get_block_t get_block,
+ struct bio *(*alloc)(struct block_device *bdev, sector_t first_sector,
+ int nr_vecs, gfp_t gfp_flags, void *priv),
+ struct bio *(*submit)(int rw, struct bio *bio),
+ void *priv)
+{
+ struct bio *bio = NULL;
+ unsigned page_idx;
+ sector_t last_block_in_bio = 0;
+ struct pagevec lru_pvec;
+ struct buffer_head map_bh;
+ unsigned long first_logical_block = 0;
+
+ clear_buffer_mapped(&map_bh);
+ pagevec_init(&lru_pvec, 0);
+ for (page_idx = 0; page_idx < nr_pages; page_idx++) {
+ struct page *page = list_entry(pages->prev, struct page, lru);
+
+ prefetchw(&page->flags);
+ list_del(&page->lru);
+ if (!add_to_page_cache(page, mapping,
+ page->index, GFP_KERNEL)) {
+ bio = do_mpage_readpage(bio, page,
+ nr_pages - page_idx,
+ &last_block_in_bio, &map_bh,
+ &first_logical_block,
+ get_block, alloc, submit, priv);
+ if (!pagevec_add(&lru_pvec, page))
+ __pagevec_lru_add(&lru_pvec);
+ } else {
+ page_cache_release(page);
+ }
+ }
+ pagevec_lru_add(&lru_pvec);
+ BUG_ON(!list_empty(pages));
+ if (bio)
+ submit(READ, bio);
+ return 0;
+}
+
/**
* mpage_readpages - populate an address space with some pages, and
* start reads against them.
@@ -386,40 +432,28 @@ int
mpage_readpages(struct address_space *mapping, struct list_head *pages,
unsigned nr_pages, get_block_t get_block)
{
- struct bio *bio = NULL;
- unsigned page_idx;
- sector_t last_block_in_bio = 0;
- struct pagevec lru_pvec;
- struct buffer_head map_bh;
- unsigned long first_logical_block = 0;
+ return __mpage_readpages(mapping, pages, nr_pages, get_block,
+ mpage_alloc, mpage_bio_submit, NULL);
+}
+EXPORT_SYMBOL(mpage_readpages);

- clear_buffer_mapped(&map_bh);
- pagevec_init(&lru_pvec, 0);
- for (page_idx = 0; page_idx < nr_pages; page_idx++) {
- struct page *page = list_entry(pages->prev, struct page, lru);
+#ifdef CONFIG_KEVENT_AIO
+extern struct bio *kaio_mpage_alloc(struct block_device *bdev, sector_t first_sector,
+ int nr_vecs, gfp_t gfp_flags, void *priv);
+extern struct bio *kaio_mpage_bio_submit(int rw, struct bio *bio);
+#else
+#define kaio_mpage_alloc mpage_alloc
+#define kaio_mpage_bio_submit mpage_bio_submit
+#endif

- prefetchw(&page->flags);
- list_del(&page->lru);
- if (!add_to_page_cache(page, mapping,
- page->index, GFP_KERNEL)) {
- bio = do_mpage_readpage(bio, page,
- nr_pages - page_idx,
- &last_block_in_bio, &map_bh,
- &first_logical_block,
- get_block);
- if (!pagevec_add(&lru_pvec, page))
- __pagevec_lru_add(&lru_pvec);
- } else {
- page_cache_release(page);
- }
- }
- pagevec_lru_add(&lru_pvec);
- BUG_ON(!list_empty(pages));
- if (bio)
- mpage_bio_submit(READ, bio);
- return 0;
+int
+mpage_readpages_aio(struct address_space *mapping, struct list_head *pages,
+ unsigned nr_pages, get_block_t get_block, void *priv)
+{
+ return __mpage_readpages(mapping, pages, nr_pages, get_block,
+ kaio_mpage_alloc, kaio_mpage_bio_submit, priv);
}
-EXPORT_SYMBOL(mpage_readpages);
+EXPORT_SYMBOL(mpage_readpages_aio);

/*
* This isn't called much at all
@@ -433,7 +467,8 @@ int mpage_readpage(struct page *page, get_block_t get_block)

clear_buffer_mapped(&map_bh);
bio = do_mpage_readpage(bio, page, 1, &last_block_in_bio,
- &map_bh, &first_logical_block, get_block);
+ &map_bh, &first_logical_block, get_block,
+ mpage_alloc, mpage_bio_submit, NULL);
if (bio)
mpage_bio_submit(READ, bio);
return 0;
@@ -595,7 +630,7 @@ page_is_mapped:
alloc_new:
if (bio == NULL) {
bio = mpage_alloc(bdev, blocks[0] << (blkbits - 9),
- bio_get_nr_vecs(bdev), GFP_NOFS|__GFP_HIGH);
+ bio_get_nr_vecs(bdev), GFP_NOFS|__GFP_HIGH, NULL);
if (bio == NULL)
goto confused;
}
diff --git a/include/linux/mpage.h b/include/linux/mpage.h
index cc5fb75..accdbdd 100644
--- a/include/linux/mpage.h
+++ b/include/linux/mpage.h
@@ -16,6 +16,8 @@ typedef int (writepage_t)(struct page *page, struct writeback_control *wbc);

int mpage_readpages(struct address_space *mapping, struct list_head *pages,
unsigned nr_pages, get_block_t get_block);
+int mpage_readpages_aio(struct address_space *mapping, struct list_head *pages,
+ unsigned nr_pages, get_block_t get_block, void *priv);
int mpage_readpage(struct page *page, get_block_t get_block);
int mpage_writepages(struct address_space *mapping,
struct writeback_control *wbc, get_block_t get_block);
diff --git a/kernel/kevent/kevent_aio.c b/kernel/kevent/kevent_aio.c
new file mode 100644
index 0000000..9bb5e99
--- /dev/null
+++ b/kernel/kevent/kevent_aio.c
@@ -0,0 +1,939 @@
+/*
+ * 2006 Copyright (c) Evgeniy Polyakov <johnpol@xxxxxxxxxxx>
+ * All rights reserved.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+ */
+
+#include <linux/kernel.h>
+#include <linux/module.h>
+#include <linux/types.h>
+#include <linux/spinlock.h>
+#include <linux/list.h>
+#include <linux/kthread.h>
+#include <linux/socket.h>
+#include <linux/slab.h>
+#include <linux/bio.h>
+#include <linux/pagemap.h>
+#include <linux/file.h>
+#include <linux/swap.h>
+#include <linux/kevent.h>
+
+#define KAIO_CALL_NUM 8
+#define KAIO_THREAD_NUM 8
+
+//#define KEVENT_AIO_DEBUG
+
+#ifdef KEVENT_AIO_DEBUG
+#define dprintk(f, a...) printk(f, ##a)
+#else
+#define dprintk(f, a...) do {} while (0)
+#endif
+
+struct kaio_thread
+{
+ struct list_head req_list;
+ spinlock_t req_lock;
+ struct task_struct *thread;
+ int refcnt;
+ wait_queue_head_t wait;
+};
+
+extern struct file_operations kevent_user_fops;
+static DEFINE_PER_CPU(u32, kaio_req_counter);
+static DEFINE_PER_CPU(int, kaio_req_cpu);
+
+/*
+ * Array of working threads.
+ * It can only be accessed under RCU protection,
+ * so threads reference counters are not atomic.
+ */
+static struct kaio_thread *kaio_threads[KAIO_THREAD_NUM] __read_mostly;
+static struct kmem_cache *kaio_req_cache __read_mostly;
+static struct kmem_cache *kaio_priv_cache __read_mostly;
+
+struct kaio_req;
+typedef int (* kaio_callback)(struct kaio_req *req, int direct);
+
+#define KAIO_REQ_PENDING 0
+
+/*
+ * Cache of kaio request callbacks.
+ * It is not allowed to change the same cache entry
+ * simultaneously (for example it is forbidden to add entries
+ * in parallel).
+ *
+ * When cache entry is scheduled for execution in one of the threads,
+ * it is forbidden to access it, since it will be freed when
+ * all callbacks have been invoked.
+ *
+ * It is possible to add callbacks into this cache from callbacks itself.
+ */
+struct kaio_req
+{
+ struct list_head req_entry;
+ kaio_callback call[KAIO_CALL_NUM];
+ int read_idx, add_idx;
+ int cpu;
+ long flags;
+ atomic_t refcnt;
+ void (*destructor)(struct kaio_req *);
+ void *priv;
+};
+
+/*
+ * Returns pointer to thread entry for given index.
+ * Must be called under RCU protection.
+ */
+static struct kaio_thread *kaio_get_thread(int cpu)
+{
+ struct kaio_thread *th;
+
+ if (cpu == -1) {
+#if 1
+ int *cnt = &__get_cpu_var(kaio_req_cpu);
+ cpu = *cnt;
+
+ *cnt = *cnt + 1;
+ if (*cnt >= KAIO_THREAD_NUM)
+ *cnt = 0;
+#else
+ cpu = 0;
+#endif
+ }
+
+ if (unlikely(cpu >= KAIO_THREAD_NUM || !kaio_threads[cpu]))
+ return NULL;
+
+ th = kaio_threads[cpu];
+ th->refcnt++;
+
+ return th;
+}
+
+/*
+ * Drops reference counter for given thread.
+ * Must be called under RCU protection.
+ */
+static inline void kaio_put_thread(struct kaio_thread *th)
+{
+ th->refcnt--;
+}
+
+void kaio_schedule_req(struct kaio_req *req)
+{
+ struct kaio_thread *th;
+ unsigned long flags;
+
+ rcu_read_lock();
+ th = kaio_get_thread(req->cpu);
+ if (!th) {
+ req->cpu = -1;
+ th = kaio_get_thread(-1);
+ BUG_ON(!th);
+ }
+
+ if (!test_and_set_bit(KAIO_REQ_PENDING, &req->flags)) {
+ spin_lock_irqsave(&th->req_lock, flags);
+ list_add_tail(&req->req_entry, &th->req_list);
+ spin_unlock_irqrestore(&th->req_lock, flags);
+ }
+
+ wake_up(&th->wait);
+
+ kaio_put_thread(th);
+ rcu_read_unlock();
+}
+
+EXPORT_SYMBOL_GPL(kaio_schedule_req);
+
+static inline void kaio_req_get(struct kaio_req *req)
+{
+ atomic_inc(&req->refcnt);
+}
+
+static inline int kaio_req_put(struct kaio_req *req)
+{
+ if (atomic_dec_and_test(&req->refcnt)) {
+ dprintk("%s: freeing req: %p, priv: %p.\n", __func__, req, req->priv);
+ if (req->destructor)
+ req->destructor(req);
+ kmem_cache_free(kaio_req_cache, req);
+ return 1;
+ }
+
+ return 0;
+}
+
+/*
+ * Append a call request into cache.
+ * Returns -EOVERFLOW in case cache is full, and 0 otherwise.
+ */
+int kaio_append_call(struct kaio_req *req, kaio_callback call)
+{
+ if ((req->add_idx + 1 == req->read_idx) ||
+ ((req->add_idx + 1 == KAIO_CALL_NUM) && req->read_idx == 0))
+ return -EOVERFLOW;
+
+ req->call[req->add_idx] = call;
+
+ dprintk("%s: req: %p, read_idx: %d, add_idx: %d, call: %p [%p].\n",
+ __func__, req, req->read_idx, req->add_idx,
+ req->call[req->read_idx], req->call[req->add_idx]);
+ if (++req->add_idx == KAIO_CALL_NUM)
+ req->add_idx = 0;
+
+ kaio_req_get(req);
+
+ return 0;
+}
+
+EXPORT_SYMBOL_GPL(kaio_append_call);
+
+/*
+ * Adds one call request into given cache.
+ * If cache is NULL or full, allocate new one.
+ */
+struct kaio_req *kaio_add_call(struct kaio_req *req, kaio_callback call, int cpu, gfp_t gflags)
+{
+ if (req && !kaio_append_call(req, call)) {
+ kaio_schedule_req(req);
+ return req;
+ }
+
+ req = kmem_cache_alloc(kaio_req_cache, gflags);
+ if (!req)
+ return NULL;
+
+ memset(req->call, 0, sizeof(req->call));
+
+ req->destructor = NULL;
+ req->cpu = cpu;
+ req->call[0] = call;
+ req->add_idx = 1;
+ req->read_idx = 0;
+ req->flags = 0;
+ atomic_set(&req->refcnt, 1);
+
+ dprintk("%s: req: %p, call: %p [%p].\n", __func__, req, call, req->call[0]);
+
+ return req;
+}
+
+EXPORT_SYMBOL_GPL(kaio_add_call);
+
+/*
+ * Call appropriate callbacks in cache.
+ * This can only be called by working threads, which means that cache
+ * is filled (probably partially) and are not even accessible from
+ * the originator of requests, which means that cache will be freed
+ * when all callbacks are invoked.
+ *
+ * Callback itself can reschedule new callback into the same cache.
+ *
+ * If callback returns negative value, the whole cache will be freed.
+ * If positive value is returned, then further processing is stopped,
+ * so cache can be queued into the end of the processing FIFO by callback.
+ * If zero is returned, next callback will be invoked if any.
+ */
+static int kaio_call(struct kaio_req *req)
+{
+ int err = -EINVAL;
+
+ if (likely(req->add_idx != req->read_idx)) {
+ dprintk("%s: req: %p, read_idx: %d, add_idx: %d, call: %p [%p].\n",
+ __func__, req, req->read_idx, req->add_idx,
+ req->call[req->read_idx], req->call[0]);
+ err = (*req->call[req->read_idx])(req, 0);
+ if (++req->read_idx == KAIO_CALL_NUM)
+ req->read_idx = 0;
+
+ if (kaio_req_put(req))
+ err = 0;
+ }
+ return err;
+}
+
+static int kaio_thread_process(void *data)
+{
+ struct kaio_thread *th = data;
+ unsigned long flags;
+ struct kaio_req *req, *first;
+ DECLARE_WAITQUEUE(wait, current);
+ int err;
+
+ add_wait_queue_exclusive(&th->wait, &wait);
+
+ while (!kthread_should_stop()) {
+ first = req = NULL;
+ do {
+ req = NULL;
+ spin_lock_irqsave(&th->req_lock, flags);
+ if (!list_empty(&th->req_list)) {
+ req = list_entry(th->req_list.prev, struct kaio_req, req_entry);
+ if (first != req)
+ list_del(&req->req_entry);
+ }
+ spin_unlock_irqrestore(&th->req_lock, flags);
+
+ if (!first)
+ first = req;
+ else if (first == req)
+ break;
+
+ if (req) {
+ err = 0;
+ while ((req->read_idx != req->add_idx) && !kthread_should_stop()) {
+ dprintk("%s: req: %p, read_idx: %d, add_idx: %d, err: %d.\n",
+ __func__, req, req->read_idx, req->add_idx, err);
+ err = kaio_call(req);
+ if (err != 0)
+ break;
+ }
+
+ if (err > 0) {
+ spin_lock_irqsave(&th->req_lock, flags);
+ list_add_tail(&req->req_entry, &th->req_list);
+ spin_unlock_irqrestore(&th->req_lock, flags);
+ }
+ }
+ } while (req);
+ __set_current_state(TASK_INTERRUPTIBLE);
+ schedule_timeout(HZ);
+ __set_current_state(TASK_RUNNING);
+ }
+
+ remove_wait_queue(&th->wait, &wait);
+
+ return 0;
+}
+
+struct kaio_private
+{
+ union {
+ void *sptr;
+ __u64 sdata;
+ };
+ union {
+ void *dptr;
+ __u64 ddata;
+ };
+ void __user *header;
+ size_t header_size;
+ __u64 offset, processed;
+ __u64 count, limit;
+ struct kevent_user *kevent_user;
+};
+
+extern void bio_fs_destructor(struct bio *bio);
+
+static void kaio_bio_destructor(struct bio *bio)
+{
+ dprintk("%s: bio=%p, num=%u.\n", __func__, bio, bio->bi_vcnt);
+ bio_fs_destructor(bio);
+}
+
+static int kaio_read_send_pages(struct kaio_req *req, int direct);
+
+static int kaio_mpage_end_io_read(struct bio *bio, unsigned int bytes_done, int err)
+{
+ const int uptodate = test_bit(BIO_UPTODATE, &bio->bi_flags);
+ struct bio_vec *bvec = bio->bi_io_vec + bio->bi_vcnt - 1;
+ struct kaio_req *req = bio->bi_private;
+
+ if (bio->bi_size)
+ return 1;
+
+ do {
+ struct page *page = bvec->bv_page;
+
+ if (--bvec >= bio->bi_io_vec)
+ prefetchw(&bvec->bv_page->flags);
+
+ if (uptodate) {
+ SetPageUptodate(page);
+ } else {
+ ClearPageUptodate(page);
+ SetPageError(page);
+ }
+ unlock_page(page);
+ } while (bvec >= bio->bi_io_vec);
+
+ dprintk("%s: bio: %p, req: %p, pending: %d.\n",
+ __func__, bio, req, test_bit(KAIO_REQ_PENDING, &req->flags));
+
+ kaio_append_call(req, kaio_read_send_pages);
+ kaio_req_put(req);
+ kaio_schedule_req(req);
+
+ bio_put(bio);
+ return 0;
+}
+
+struct bio *kaio_mpage_bio_submit(int rw, struct bio *bio)
+{
+ if (bio) {
+ bio->bi_end_io = kaio_mpage_end_io_read;
+ dprintk("%s: bio=%p, num=%u.\n", __func__, bio, bio->bi_vcnt);
+ submit_bio(READ, bio);
+ }
+ return NULL;
+}
+
+struct bio *kaio_mpage_alloc(struct block_device *bdev,
+ sector_t first_sector, int nr_vecs, gfp_t gfp_flags, void *priv)
+{
+ struct bio *bio;
+
+ bio = bio_alloc(gfp_flags, nr_vecs);
+
+ if (bio == NULL && (current->flags & PF_MEMALLOC)) {
+ while (!bio && (nr_vecs /= 2))
+ bio = bio_alloc(gfp_flags, nr_vecs);
+ }
+
+ if (bio) {
+ struct kaio_req *req = priv;
+
+ bio->bi_bdev = bdev;
+ bio->bi_sector = first_sector;
+ bio->bi_private = priv;
+ bio->bi_destructor = kaio_bio_destructor;
+ kaio_req_get(req);
+ dprintk("%s: bio: %p, req: %p, num: %d.\n", __func__, bio, priv, nr_vecs);
+ }
+ return bio;
+}
+
+static ssize_t kaio_vfs_read_actor(struct kaio_private *priv, struct page *page, size_t len)
+{
+ struct socket *sock = priv->dptr;
+ struct file *file = sock->file;
+
+ return file->f_op->sendpage(file, page, 0, len, &file->f_pos, 1);
+}
+
+static int kaio_vfs_read(struct kaio_private *priv,
+ ssize_t (*actor)(struct kaio_private *, struct page *, size_t))
+{
+ struct address_space *mapping;
+ struct file *file = priv->sptr;
+ ssize_t actor_size;
+ loff_t isize;
+ int i = 0, pg_num;
+
+ mapping = file->f_mapping;
+ isize = i_size_read(file->f_dentry->d_inode);
+
+ if (priv->processed >= isize) {
+ priv->count = 0;
+ return 0;
+ }
+ priv->count = isize - priv->processed;
+ pg_num = ALIGN(min_t(u64, isize, priv->count), PAGE_SIZE) >> PAGE_SHIFT;
+
+ dprintk("%s: start: priv: %p, ret: %d, num: %d, count: %Lu, offset: %Lu, processed: %Lu.\n",
+ __func__, priv, i, pg_num, priv->count, priv->offset, priv->processed);
+
+ for (i=0; i<pg_num && priv->count; ++i) {
+ struct page *page;
+ size_t nr = PAGE_CACHE_SIZE;
+
+ page = find_get_page(mapping, priv->processed >> PAGE_CACHE_SHIFT);
+ if (unlikely(page == NULL))
+ break;
+ if (!PageUptodate(page)) {
+ dprintk("%s: %2d: page=%p, processed=%Lu, count=%Lu not uptodate.\n",
+ __func__, i, page, priv->processed, priv->count);
+ page_cache_release(page);
+ break;
+ }
+
+ if (mapping_writably_mapped(mapping))
+ flush_dcache_page(page);
+
+ mark_page_accessed(page);
+
+ if (nr + priv->processed > isize)
+ nr = isize - priv->processed;
+ if (nr > priv->count)
+ nr = priv->count;
+
+ actor_size = actor(priv, page, nr);
+ if (actor_size < 0) {
+ page_cache_release(page);
+ i = (int)actor_size;
+ break;
+ }
+
+ page_cache_release(page);
+
+ priv->processed += actor_size;
+ priv->count -= actor_size;
+ }
+
+ if (!priv->count)
+ i = pg_num;
+
+ dprintk("%s: end: priv: %p, ret: %d, num: %d, count: %Lu, offset: %Lu, processed: %Lu.\n",
+ __func__, priv, i, pg_num, priv->count, priv->offset, priv->processed);
+
+ return i;
+}
+
+/*
+ * SThis function sends a header if it is available.
+ * Zero is returned if it was sent correctly,
+ * in case of fatal error negative value is returned,
+ * and positive one, if -EAGAIN was returned by underlying socket layer.
+ */
+static int kaio_send_header(struct kaio_private *priv)
+{
+ int err;
+ struct socket *sock = priv->dptr;
+ struct iovec iov;
+ struct msghdr msg;
+
+ memset(&msg, 0, sizeof(struct msghdr));
+ iov.iov_base = priv->header;
+ iov.iov_len = priv->header_size;
+ msg.msg_iov = &iov;
+ msg.msg_iovlen = 1;
+
+ while (priv->header_size) {
+ iov.iov_base = priv->header;
+ iov.iov_len = priv->header_size;
+
+ err = sock_sendmsg(sock, &msg, priv->header_size);
+
+ if (err < 0)
+ return err;
+ if (err == 0)
+ return -1;
+
+ priv->header += err;
+ priv->header_size -= err;
+ }
+
+ return 0;
+}
+
+static int kaio_read_send_pages(struct kaio_req *req, int direct)
+{
+ struct kaio_private *priv = req->priv;
+ struct file *file = priv->sptr;
+ struct address_space *mapping = file->f_mapping;
+ struct page *page;
+ int err = 0, i, num;
+ u64 offset;
+ LIST_HEAD(page_pool);
+
+ if (priv->header_size) {
+ err = kaio_send_header(priv);
+ if (err < 0)
+ return err;
+ }
+
+ if (err > 0) {
+ kaio_append_call(req, kaio_read_send_pages);
+ return 0;
+ }
+
+ err = kaio_vfs_read(priv, &kaio_vfs_read_actor);
+ if (err < 0)
+ return err;
+
+ if (err == 0) {
+ priv->limit >>= 1;
+ } else {
+ if (priv->limit)
+ priv->limit <<= 1;
+ else
+ priv->limit = 8;
+ }
+
+ if (priv->offset < priv->processed)
+ priv->offset = priv->processed;
+
+ if (!priv->count) {
+ kevent_storage_ready(&priv->kevent_user->st, NULL, KEVENT_MASK_ALL);
+ return 0;
+ }
+
+ if (priv->offset >= priv->processed + priv->count) {
+ kaio_append_call(req, kaio_read_send_pages);
+ return 0;
+ }
+
+ num = min_t(int, max_sane_readahead(priv->limit),
+ ALIGN(priv->count, PAGE_SIZE) >> PAGE_SHIFT);
+
+ offset = priv->offset;
+ for (i=0; i<num; ++i) {
+ page = page_cache_alloc_cold(mapping);
+ if (!page)
+ break;
+
+ page->index = priv->offset >> PAGE_CACHE_SHIFT;
+ list_add(&page->lru, &page_pool);
+
+ priv->offset += PAGE_CACHE_SIZE;
+ }
+
+ dprintk("%s: submit: req: %p, priv: %p, offset: %Lu, num: %d, limit: %Lu.\n",
+ __func__, req, priv, offset, i, priv->limit);
+
+ err = mapping->a_ops->aio_readpages(file, mapping, &page_pool, i, req);
+ if (err) {
+ dprintk("%s: kevent_mpage_readpages failed: err=%d, count=%Lu.\n",
+ __func__, err, priv->count);
+ kaio_schedule_req(req);
+ return err;
+ }
+
+ return 1;
+}
+
+static int kaio_add_kevent(int fd, struct kaio_req *req)
+{
+ struct ukevent uk;
+ struct file *file;
+ struct kevent_user *u;
+ int err, need_fput = 0;
+ u32 *cnt;
+
+ file = fget_light(fd, &need_fput);
+ if (!file) {
+ err = -EBADF;
+ goto err_out;
+ }
+
+ if (file->f_op != &kevent_user_fops) {
+ err = -EINVAL;
+ goto err_out_fput;
+ }
+
+ u = file->private_data;
+
+ memset(&uk, 0, sizeof(struct ukevent));
+
+ uk.event = KEVENT_MASK_ALL;
+ uk.type = KEVENT_AIO;
+
+ preempt_disable();
+ uk.id.raw[0] = smp_processor_id();
+ cnt = &__get_cpu_var(kaio_req_counter);
+ uk.id.raw[1] = *cnt;
+ *cnt = *cnt + 1;
+ preempt_enable();
+
+ uk.req_flags = KEVENT_REQ_ONESHOT | KEVENT_REQ_ALWAYS_QUEUE;
+ uk.ptr = req;
+
+ err = kevent_user_add_ukevent(&uk, u);
+ if (err)
+ goto err_out_fput;
+
+ kevent_user_get(u);
+
+ fput_light(file, need_fput);
+
+ return 0;
+
+err_out_fput:
+ fput_light(file, need_fput);
+err_out:
+ return err;
+}
+
+static void kaio_destructor(struct kaio_req *req)
+{
+ struct kaio_private *priv = req->priv;
+ struct socket *sock = priv->dptr;
+ struct file *file = priv->sptr;
+
+ fput(file);
+ sockfd_put(sock);
+
+ kevent_storage_ready(&priv->kevent_user->st, NULL, KEVENT_MASK_ALL);
+ kevent_user_put(priv->kevent_user);
+
+ kmem_cache_free(kaio_priv_cache, req->priv);
+}
+
+static struct kaio_req *kaio_sendfile(int kevent_fd, int sock_fd,
+ void __user *header, size_t header_size,
+ struct file *file, off_t offset, size_t count)
+{
+ struct kaio_req *req;
+ struct socket *sock;
+ struct kaio_private *priv;
+ int err;
+
+ sock = sockfd_lookup(sock_fd, &err);
+ if (!sock)
+ goto err_out_exit;
+
+ priv = kmem_cache_alloc(kaio_priv_cache, GFP_KERNEL);
+ if (!priv)
+ goto err_out_sput;
+
+ priv->sptr = file;
+ priv->dptr = sock;
+ priv->offset = offset;
+ priv->count = min_t(u64, i_size_read(file->f_dentry->d_inode), count);
+ priv->processed = offset;
+ priv->limit = 128;
+ priv->header = header;
+ priv->header_size = header_size;
+
+ req = kaio_add_call(NULL, &kaio_read_send_pages, -1, GFP_KERNEL);
+ if (!req)
+ goto err_out_free;
+
+ req->destructor = kaio_destructor;
+ req->priv = priv;
+
+ err = kaio_add_kevent(kevent_fd, req);
+
+ dprintk("%s: req: %p, priv: %p, call: %p [%p], offset: %Lu, processed: %Lu, count: %Lu, err: %d.\n",
+ __func__, req, priv, &kaio_read_send_pages,
+ kaio_read_send_pages, priv->offset, priv->processed, priv->count, err);
+
+ if (err)
+ goto err_out_remove;
+
+ kaio_schedule_req(req);
+
+ return req;
+
+err_out_remove:
+ /* It is safe to just free the object since it is guaranteed that it was not
+ * queued for processing.
+ */
+ kmem_cache_free(kaio_req_cache, req);
+err_out_free:
+ kmem_cache_free(kaio_priv_cache, priv);
+err_out_sput:
+ sockfd_put(sock);
+err_out_exit:
+ return NULL;
+
+}
+
+asmlinkage long sys_aio_sendfile(int kevent_fd, int sock_fd, int in_fd, off_t offset, size_t count)
+{
+ struct kaio_req *req;
+ struct file *file;
+ int err;
+
+ file = fget(in_fd);
+ if (!file) {
+ err = -EBADF;
+ goto err_out_exit;
+ }
+
+ req = kaio_sendfile(kevent_fd, sock_fd, NULL, 0, file, offset, count);
+ if (!req) {
+ err = -EINVAL;
+ goto err_out_fput;
+ }
+
+ return (long)req;
+
+err_out_fput:
+ fput(file);
+err_out_exit:
+ return err;
+}
+
+asmlinkage long sys_aio_sendfile_path(int kevent_fd, int sock_fd,
+ void __user *header, size_t header_size,
+ char __user *filename, off_t offset, size_t count)
+{
+ char *tmp = getname(filename);
+ int fd = PTR_ERR(tmp);
+ int flags = O_RDONLY, err;
+ struct nameidata nd;
+ struct file *file;
+ struct kaio_req *req;
+
+ if (force_o_largefile())
+ flags = O_LARGEFILE;
+
+ if (IS_ERR(tmp)) {
+ err = fd;
+ goto err_out_exit;
+ }
+
+ fd = get_unused_fd();
+ if (fd < 0) {
+ err = fd;
+ goto err_out_put_name;
+ }
+
+ if ((flags+1) & O_ACCMODE)
+ flags++;
+
+ err = open_namei(AT_FDCWD, tmp, flags, 0400, &nd);
+ if (err)
+ goto err_out_fdput;
+
+ file = nameidata_to_filp(&nd, flags);
+ if (!file)
+ goto err_out_fdput;
+
+ /* One reference will be released in sys_close(),
+ * second one through req->destructor()
+ */
+ atomic_inc(&file->f_count);
+
+ req = kaio_sendfile(kevent_fd, sock_fd, header, header_size,
+ file, offset, count);
+ if (!req) {
+ err = -EINVAL;
+ goto err_out_fput;
+ }
+
+ fd_install(fd, file);
+
+ return fd;
+
+err_out_fput:
+ fput(file);
+ fput(file);
+err_out_fdput:
+ put_unused_fd(fd);
+err_out_put_name:
+ putname(tmp);
+err_out_exit:
+ return err;
+}
+
+static int kevent_aio_callback(struct kevent *k)
+{
+ struct kaio_req *req = k->event.ptr;
+ struct kaio_private *priv = req->priv;
+
+ if (!priv->count) {
+ __u32 *processed = (__u32 *)&priv->processed;
+ k->event.ret_data[0] = processed[0];
+ k->event.ret_data[1] = processed[1];
+ return 1;
+ }
+
+ return 0;
+}
+
+int kevent_aio_enqueue(struct kevent *k)
+{
+ int err;
+ struct kaio_req *req = k->event.ptr;
+ struct kaio_private *priv = req->priv;
+
+ err = kevent_storage_enqueue(&k->user->st, k);
+ if (err)
+ goto err_out_exit;
+
+ priv->kevent_user = k->user;
+ if (k->event.req_flags & KEVENT_REQ_ALWAYS_QUEUE)
+ kevent_requeue(k);
+
+ return 0;
+
+err_out_exit:
+ return err;
+}
+
+int kevent_aio_dequeue(struct kevent *k)
+{
+ kevent_storage_dequeue(k->st, k);
+
+ return 0;
+}
+
+static void kaio_thread_stop(struct kaio_thread *th)
+{
+ kthread_stop(th->thread);
+ kfree(th);
+}
+
+static int kaio_thread_start(struct kaio_thread **thp, int num)
+{
+ struct kaio_thread *th;
+
+ th = kzalloc(sizeof(struct kaio_thread), GFP_KERNEL);
+ if (!th)
+ return -ENOMEM;
+
+ th->refcnt = 1;
+ spin_lock_init(&th->req_lock);
+ INIT_LIST_HEAD(&th->req_list);
+ init_waitqueue_head(&th->wait);
+
+ th->thread = kthread_run(kaio_thread_process, th, "kaio/%d", num);
+ if (IS_ERR(th->thread)) {
+ int err = PTR_ERR(th->thread);
+ kfree(th);
+ return err;
+ }
+
+ *thp = th;
+ wmb();
+
+ return 0;
+}
+
+static int __init kevent_init_aio(void)
+{
+ struct kevent_callbacks sc = {
+ .callback = &kevent_aio_callback,
+ .enqueue = &kevent_aio_enqueue,
+ .dequeue = &kevent_aio_dequeue,
+ .flags = 0,
+ };
+ int err, i;
+
+ kaio_req_cache = kmem_cache_create("kaio_req", sizeof(struct kaio_req),
+ 0, SLAB_PANIC, NULL, NULL);
+ kaio_priv_cache = kmem_cache_create("kaio_priv", sizeof(struct kaio_private),
+ 0, SLAB_PANIC, NULL, NULL);
+
+ memset(kaio_threads, 0, sizeof(kaio_threads));
+
+ for (i=0; i<KAIO_THREAD_NUM; ++i) {
+ err = kaio_thread_start(&kaio_threads[i], i);
+ if (err)
+ goto err_out_stop;
+ }
+
+ err = kevent_add_callbacks(&sc, KEVENT_AIO);
+ if (err)
+ goto err_out_stop;
+
+ return 0;
+
+err_out_stop:
+ while (--i >= 0) {
+ struct kaio_thread *th = kaio_threads[i];
+
+ kaio_threads[i] = NULL;
+ wmb();
+
+ kaio_thread_stop(th);
+ }
+ return err;
+}
+module_init(kevent_init_aio);

-
To unsubscribe from this list: send the line "unsubscribe linux-kernel" in
the body of a message to majordomo@xxxxxxxxxxxxxxx
More majordomo info at http://vger.kernel.org/majordomo-info.html
Please read the FAQ at http://www.tux.org/lkml/