[PATCH,BETA] new pipe code

From: Manfred Spraul (manfreds@colorfullife.com)
Date: Tue Feb 29 2000 - 16:52:48 EST


I've rewritten the pipe code:
the old code caused lots of reschedules under certain loads. The new
code is not yet fully tested (esp. O_NONBLOCK is untested), but I'm
really interested what you think about it.

I've described the new locking in <linux/pipe_fs_i.h>.

--
	Manfred

// $Header$ // Kernel Version: // VERSION = 2 // PATCHLEVEL = 3 // SUBLEVEL = 48 // EXTRAVERSION = --- 2.3/fs/pipe.c Sun Feb 27 08:57:11 2000 +++ build-2.3/fs/pipe.c Tue Feb 29 22:36:02 2000 @@ -29,10 +29,13 @@ * * Reads with count = 0 should always return 0. * -- Julian Bradfield 1999-06-07. + * + * new locking code + * (c) 2000 Manfred Spraul <manfreds@colorfullife.com> */ /* Drop the inode semaphore and wait for a pipe event, atomically */ -static void pipe_wait(struct inode * inode) +void pipe_wait(struct inode * inode) { DECLARE_WAITQUEUE(wait, current); current->state = TASK_INTERRUPTIBLE; @@ -47,7 +50,8 @@ pipe_read(struct file *filp, char *buf, size_t count, loff_t *ppos) { struct inode *inode = filp->f_dentry->d_inode; - ssize_t size, read, ret; + struct pipe_readerdesc pr; + ssize_t read, ret; /* Seeks are not allowed on pipes. */ ret = -ESPIPE; @@ -59,82 +63,91 @@ ret = 0; if (count == 0) goto out_nolock; + pr.tsk = current; - /* Get the pipe semaphore */ - ret = -ERESTARTSYS; - if (down_interruptible(PIPE_SEM(*inode))) - goto out_nolock; - - if (PIPE_EMPTY(*inode)) { -do_more_read: - ret = 0; + spin_lock(PIPE_LOCK(*inode)); + list_add_tail(&pr.list, PIPE_RLIST(*inode)); + for(;;) { + if(PIPE_RLIST(*inode)->next == &pr.list) + break; + if (filp->f_flags & O_NONBLOCK) { + /* FIXME: it's possible that data is available.*/ + ret = -EAGAIN; + goto out_unlink; + } + current->state=TASK_INTERRUPTIBLE; + spin_unlock(PIPE_LOCK(*inode)); + schedule(); + spin_lock(PIPE_LOCK(*inode)); + ret = -ERESTARTSYS; + if (signal_pending(current)) + goto out_unlink; + } + for(;;) { + int avail=PIPE_SIZE-PIPE_FREE(*inode); + if(avail != 0) { + int start; + if(avail>count) + avail=count; + /* transfer data*/ + start = PIPE_START(*inode); + spin_unlock(PIPE_LOCK(*inode)); + + if(start+avail<=PIPE_SIZE) { + ret = copy_to_user(buf, PIPE_BASE(*inode)+start, avail); + } else { + int p1=PIPE_SIZE-start; + ret = copy_to_user(buf, PIPE_BASE(*inode)+start, p1); + ret |= copy_to_user(buf+p1, PIPE_BASE(*inode), avail-p1); + } + spin_lock(PIPE_LOCK(*inode)); + if(ret!=0) { + read=0; + ret = -EFAULT; + goto out_unlink; + } + read += avail; + buf += avail; + count-=avail; + PIPE_LEN(*inode) -= avail; + /* Cache behaviour optimization */ + if (!PIPE_LEN(*inode) && list_empty(PIPE_WLIST(*inode))) { + PIPE_START(*inode) = 0; + } else { + PIPE_START(*inode) = (start+avail)&(PIPE_SIZE-1); + } + /* wake up polling threads*/ + wake_up(PIPE_WAIT(*inode)); + /* wake up writing threads*/ + if(!list_empty(PIPE_WLIST(*inode))) + wake_up_process(list_entry(PIPE_WLIST(*inode)->next,struct pipe_writerdesc,list)->tsk); + } + if(count<=0) + break; + /* wait for more data*/ if (!PIPE_WRITERS(*inode)) - goto out; - - ret = -EAGAIN; - if (filp->f_flags & O_NONBLOCK) - goto out; - - for (;;) { - PIPE_WAITING_READERS(*inode)++; - pipe_wait(inode); - PIPE_WAITING_READERS(*inode)--; - ret = -ERESTARTSYS; - if (signal_pending(current)) - goto out_nolock; - if (down_interruptible(PIPE_SEM(*inode))) - goto out_nolock; - ret = 0; - if (!PIPE_EMPTY(*inode)) - break; - if (!PIPE_WRITERS(*inode)) - goto out; + break; + if (filp->f_flags & O_NONBLOCK) { + ret = -EAGAIN; + break; } + current->state=TASK_INTERRUPTIBLE; + spin_unlock(PIPE_LOCK(*inode)); + schedule(); + spin_lock(PIPE_LOCK(*inode)); + ret = -ERESTARTSYS; + if (signal_pending(current)) + goto out_unlink; } - /* Read what data is available. */ - ret = -EFAULT; - while (count > 0 && (size = PIPE_LEN(*inode))) { - char *pipebuf = PIPE_BASE(*inode) + PIPE_START(*inode); - ssize_t chars = PIPE_MAX_RCHUNK(*inode); - - if (chars > count) - chars = count; - if (chars > size) - chars = size; - - if (copy_to_user(buf, pipebuf, chars)) - goto out; - - read += chars; - PIPE_START(*inode) += chars; - PIPE_START(*inode) &= (PIPE_SIZE - 1); - PIPE_LEN(*inode) -= chars; - count -= chars; - buf += chars; - } - - /* Cache behaviour optimization */ - if (!PIPE_LEN(*inode)) - PIPE_START(*inode) = 0; - - if (count && PIPE_WAITING_WRITERS(*inode) && !(filp->f_flags & O_NONBLOCK)) { - /* - * We know that we are going to sleep: signal - * writers synchronously that there is more - * room. - */ - wake_up_interruptible_sync(PIPE_WAIT(*inode)); - if (!PIPE_EMPTY(*inode)) - BUG(); - goto do_more_read; +out_unlink: + if ((PIPE_RLIST(*inode)->next == &pr.list)&& + (PIPE_RLIST(*inode)->prev != &pr.list)) { + wake_up_process(list_entry(pr.list.next,struct pipe_readerdesc,list)->tsk); } - /* Signal writers asynchronously that there is more room. */ - wake_up_interruptible(PIPE_WAIT(*inode)); + list_del(&pr.list); - ret = read; -out: - up(PIPE_SEM(*inode)); + spin_unlock(PIPE_LOCK(*inode)); out_nolock: if (read) ret = read; @@ -145,7 +158,8 @@ pipe_write(struct file *filp, const char *buf, size_t count, loff_t *ppos) { struct inode *inode = filp->f_dentry->d_inode; - ssize_t free, written, ret; + struct pipe_writerdesc pw; + ssize_t minfree, written, ret; /* Seeks are not allowed on pipes. */ ret = -ESPIPE; @@ -158,100 +172,95 @@ if (count == 0) goto out_nolock; - ret = -ERESTARTSYS; - if (down_interruptible(PIPE_SEM(*inode))) - goto out_nolock; - -do_more_write: - /* No readers yields SIGPIPE. */ - if (!PIPE_READERS(*inode)) - goto sigpipe; - - /* If count <= PIPE_BUF, we have to make it atomic. */ - free = (count <= PIPE_BUF ? count : 1); - - /* Wait, or check for, available space. */ - if (filp->f_flags & O_NONBLOCK) { - ret = -EAGAIN; - if (PIPE_FREE(*inode) < free) - goto out; - } else { - while (PIPE_FREE(*inode) < free) { - PIPE_WAITING_WRITERS(*inode)++; - pipe_wait(inode); - PIPE_WAITING_WRITERS(*inode)--; - ret = -ERESTARTSYS; - if (signal_pending(current)) - goto out_nolock; - if (down_interruptible(PIPE_SEM(*inode))) - goto out_nolock; - - if (!PIPE_READERS(*inode)) - goto sigpipe; + pw.tsk = current; + minfree=1; + if (count <= PIPE_BUF) + minfree=count; + + spin_lock(PIPE_LOCK(*inode)); + list_add_tail(&pw.list, PIPE_WLIST(*inode)); + for(;;) { + if (!PIPE_READERS(*inode)) + goto sigpipe; + if(PIPE_WLIST(*inode)->next == &pw.list) + break; + if (filp->f_flags & O_NONBLOCK) { + /* FIXME: it's possible that data is available.*/ + ret = -EAGAIN; + goto out_unlink; } + current->state=TASK_INTERRUPTIBLE; + spin_unlock(PIPE_LOCK(*inode)); + schedule(); + spin_lock(PIPE_LOCK(*inode)); + ret = -ERESTARTSYS; + if (signal_pending(current)) + goto out_unlink; } - - /* Copy into available space. */ - ret = -EFAULT; - while (count > 0) { - int space; - char *pipebuf = PIPE_BASE(*inode) + PIPE_END(*inode); - ssize_t chars = PIPE_MAX_WCHUNK(*inode); - - if ((space = PIPE_FREE(*inode)) != 0) { - if (chars > count) - chars = count; - if (chars > space) - chars = space; - - if (copy_from_user(pipebuf, buf, chars)) - goto out; - - written += chars; - PIPE_LEN(*inode) += chars; - count -= chars; - buf += chars; - space = PIPE_FREE(*inode); - continue; + for(;;) { + int free=PIPE_FREE(*inode); + if(free>=minfree) { + int start; + if(free>count) + free=count; + /* transfer data*/ + start = PIPE_END(*inode); + spin_unlock(PIPE_LOCK(*inode)); + + if(start+free<=PIPE_SIZE) { + ret = copy_from_user(PIPE_BASE(*inode)+start, buf, free); + } else { + int p1=PIPE_SIZE-start; + + ret = copy_from_user(PIPE_BASE(*inode)+start, buf, p1); + ret |= copy_from_user(PIPE_BASE(*inode), buf+p1, free-p1); + } + + spin_lock(PIPE_LOCK(*inode)); + if(ret!=0) { + written=0; + ret = -EFAULT; + goto out_unlink; + } + written += free; + buf += free; + count-=free; + PIPE_LEN(*inode) += free; + /* wake up polling threads*/ + wake_up(PIPE_WAIT(*inode)); + /* wake up reading threads*/ + if(!list_empty(PIPE_RLIST(*inode))) + wake_up_process(list_entry(PIPE_RLIST(*inode)->next,struct pipe_readerdesc,list)->tsk); } - - ret = written; - if (filp->f_flags & O_NONBLOCK) + if(count<=0) break; - - do { - /* - * Synchronous wake-up: it knows that this process - * is going to give up this CPU, so it doesnt have - * to do idle reschedules. - */ - wake_up_interruptible_sync(PIPE_WAIT(*inode)); - PIPE_WAITING_WRITERS(*inode)++; - pipe_wait(inode); - PIPE_WAITING_WRITERS(*inode)--; - if (signal_pending(current)) - goto out_nolock; - if (down_interruptible(PIPE_SEM(*inode))) - goto out_nolock; - if (!PIPE_READERS(*inode)) - goto sigpipe; - } while (!PIPE_FREE(*inode)); - ret = -EFAULT; - } - - if (count && PIPE_WAITING_READERS(*inode) && - !(filp->f_flags & O_NONBLOCK)) { - wake_up_interruptible_sync(PIPE_WAIT(*inode)); - goto do_more_write; + /* wait for more free space*/ + if (!PIPE_READERS(*inode)) + goto sigpipe; + if (filp->f_flags & O_NONBLOCK) { + ret = -EAGAIN; + break; + } + current->state=TASK_INTERRUPTIBLE; + spin_unlock(PIPE_LOCK(*inode)); + schedule(); + spin_lock(PIPE_LOCK(*inode)); + ret = -ERESTARTSYS; + if (signal_pending(current)) + goto out_unlink; } - /* Signal readers asynchronously that there is more data. */ - wake_up_interruptible(PIPE_WAIT(*inode)); - - inode->i_ctime = inode->i_mtime = CURRENT_TIME; - mark_inode_dirty(inode); - -out: - up(PIPE_SEM(*inode)); + + if (written) { + inode->i_ctime = inode->i_mtime = CURRENT_TIME; + mark_inode_dirty(inode); + } +out_unlink: + if ((PIPE_WLIST(*inode)->next == &pw.list) && + (PIPE_WLIST(*inode)->prev != &pw.list)) { + wake_up_process(list_entry(pw.list.next,struct pipe_writerdesc,list)->tsk); + } + list_del(&pw.list); + spin_unlock(PIPE_LOCK(*inode)); out_nolock: if (written) ret = written; @@ -259,10 +268,10 @@ sigpipe: if (written) - goto out; - up(PIPE_SEM(*inode)); + goto out_unlink; + ret=-EPIPE; send_sig(SIGPIPE, current, 0); - return -EPIPE; + goto out_unlink; } static loff_t @@ -381,6 +390,40 @@ return mask; } +void cleanup_pipe_data(struct inode* inode) +{ + struct pipe_inode_info *info = inode->i_pipe; + inode->i_pipe = NULL; + free_page((unsigned long) info->base); + kfree(info); +} + +void pipe_no_readers(struct inode* inode) +{ + wake_up_interruptible(PIPE_WAIT(*inode)); + spin_lock(PIPE_LOCK(*inode)); + if(!list_empty(PIPE_WLIST(*inode))) { + struct pipe_writerdesc* pw; + + pw = list_entry(PIPE_WLIST(*inode)->next,struct pipe_writerdesc,list); + wake_up_process(pw->tsk); + } + spin_unlock(PIPE_LOCK(*inode)); +} + +void pipe_no_writers(struct inode* inode) +{ + wake_up_interruptible(PIPE_WAIT(*inode)); + spin_lock(PIPE_LOCK(*inode)); + if(!list_empty(PIPE_RLIST(*inode))) { + struct pipe_readerdesc* pr; + + pr = list_entry(PIPE_RLIST(*inode)->next,struct pipe_readerdesc,list); + wake_up_process(pr->tsk); + } + spin_unlock(PIPE_LOCK(*inode)); +} + static int pipe_release(struct inode *inode, int decr, int decw) { @@ -388,12 +431,15 @@ PIPE_READERS(*inode) -= decr; PIPE_WRITERS(*inode) -= decw; if (!PIPE_READERS(*inode) && !PIPE_WRITERS(*inode)) { - struct pipe_inode_info *info = inode->i_pipe; - inode->i_pipe = NULL; - free_page((unsigned long) info->base); - kfree(info); + /* no readers and no writers means noone uses the pipe. + * FIXME: check if someone could poll the pipe. + */ + cleanup_pipe_data(inode); } else { - wake_up_interruptible(PIPE_WAIT(*inode)); + if(!PIPE_READERS(*inode)) + pipe_no_readers(inode); + if(!PIPE_WRITERS(*inode)) + pipe_no_writers(inode); } up(PIPE_SEM(*inode)); @@ -531,30 +577,43 @@ release: pipe_rdwr_release, }; -static struct inode * get_pipe_inode(void) +int init_pipe_data(struct inode* inode) { - struct inode *inode = get_empty_inode(); unsigned long page; - if (!inode) - goto fail_inode; - page = __get_free_page(GFP_USER); if (!page) - goto fail_iput; + return -ENOMEM; inode->i_pipe = kmalloc(sizeof(struct pipe_inode_info), GFP_KERNEL); - if (!inode->i_pipe) - goto fail_page; - - inode->i_fop = &rdwr_pipe_fops; + if (!inode->i_pipe) { + free_page(page); + return -ENOMEM; + } - init_waitqueue_head(PIPE_WAIT(*inode)); + spin_lock_init(PIPE_LOCK(*inode)); + INIT_LIST_HEAD(PIPE_RLIST(*inode)); + INIT_LIST_HEAD(PIPE_WLIST(*inode)); PIPE_BASE(*inode) = (char *) page; PIPE_START(*inode) = PIPE_LEN(*inode) = 0; + PIPE_READERS(*inode) = PIPE_WRITERS(*inode) = 0; + init_waitqueue_head(PIPE_WAIT(*inode)); + + return 0; +} + +static struct inode * get_pipe_inode(void) +{ + struct inode *inode = get_empty_inode(); + + if (!inode) + return NULL; + if(init_pipe_data(inode) < 0) + goto fail_iput; + PIPE_READERS(*inode) = PIPE_WRITERS(*inode) = 1; - PIPE_WAITING_READERS(*inode) = PIPE_WAITING_WRITERS(*inode) = 0; + inode->i_fop = &rdwr_pipe_fops; /* * Mark the inode dirty from the very beginning, * that way it will never be moved to the dirty @@ -567,13 +626,11 @@ inode->i_gid = current->fsgid; inode->i_atime = inode->i_mtime = inode->i_ctime = CURRENT_TIME; inode->i_blksize = PAGE_SIZE; + return inode; -fail_page: - free_page(page); fail_iput: iput(inode); -fail_inode: return NULL; } --- 2.3/fs/fifo.c Sun Feb 27 08:57:11 2000 +++ build-2.3/fs/fifo.c Tue Feb 29 20:31:40 2000 @@ -20,27 +20,10 @@ if (down_interruptible(PIPE_SEM(*inode))) goto err_nolock_nocleanup; - if (! inode->i_pipe) { - unsigned long page; - struct pipe_inode_info *info; - - info = kmalloc(sizeof(struct pipe_inode_info),GFP_KERNEL); - - ret = -ENOMEM; - if (!info) + if (!inode->i_pipe) { + ret = init_pipe_data(inode); + if(ret < 0) goto err_nocleanup; - page = __get_free_page(GFP_KERNEL); - if (!page) { - kfree(info); - goto err_nocleanup; - } - - inode->i_pipe = info; - - init_waitqueue_head(PIPE_WAIT(*inode)); - PIPE_BASE(*inode) = (char *) page; - PIPE_START(*inode) = PIPE_LEN(*inode) = 0; - PIPE_READERS(*inode) = PIPE_WRITERS(*inode) = 0; } switch (filp->f_mode) { @@ -58,8 +41,7 @@ while (!PIPE_WRITERS(*inode)) { if (signal_pending(current)) goto err_rd; - up(PIPE_SEM(*inode)); - interruptible_sleep_on(PIPE_WAIT(*inode)); + pipe_wait(inode); /* Note that using down_interruptible here and similar places below is pointless, @@ -90,8 +72,8 @@ while (!PIPE_READERS(*inode)) { if (signal_pending(current)) goto err_wr; - up(PIPE_SEM(*inode)); - interruptible_sleep_on(PIPE_WAIT(*inode)); + pipe_wait(inode); + down(PIPE_SEM(*inode)); } break; @@ -122,22 +104,20 @@ err_rd: if (!--PIPE_READERS(*inode)) - wake_up_interruptible(PIPE_WAIT(*inode)); + pipe_no_readers(inode); ret = -ERESTARTSYS; goto err; err_wr: if (!--PIPE_WRITERS(*inode)) - wake_up_interruptible(PIPE_WAIT(*inode)); + pipe_no_writers(inode); ret = -ERESTARTSYS; goto err; err: if (!PIPE_READERS(*inode) && !PIPE_WRITERS(*inode)) { - struct pipe_inode_info *info = inode->i_pipe; - inode->i_pipe = NULL; - free_page((unsigned long)info->base); - kfree(info); + /* FIXME: could someone poll this filp? */ + cleanup_pipe_data(inode); } err_nocleanup: --- 2.3/include/linux/pipe_fs_i.h Tue Dec 7 10:48:22 1999 +++ build-2.3/include/linux/pipe_fs_i.h Tue Feb 29 15:53:39 2000 @@ -1,16 +1,34 @@ #ifndef _LINUX_PIPE_FS_I_H #define _LINUX_PIPE_FS_I_H +struct pipe_writerdesc { + struct list_head list; + struct task_struct* tsk; +}; + +struct pipe_readerdesc { + struct list_head list; + struct task_struct* tsk; +}; + struct pipe_inode_info { - wait_queue_head_t wait; + spinlock_t lock; + struct list_head waiting_readers; + struct list_head waiting_writers; char *base; unsigned int start; unsigned int readers; unsigned int writers; - unsigned int waiting_readers; - unsigned int waiting_writers; + + wait_queue_head_t wait; }; +int init_pipe_data (struct inode* inode); +void cleanup_pipe_data (struct inode* inode); +void pipe_wait (struct inode* inode); +void pipe_no_writers (struct inode* inode); +void pipe_no_readers (struct inode* inode); + /* Differs from PIPE_BUF in that PIPE_SIZE is the length of the actual memory allocation, whereas PIPE_BUF makes atomicity guarantees. */ #define PIPE_SIZE PAGE_SIZE @@ -22,8 +40,9 @@ #define PIPE_LEN(inode) ((inode).i_size) #define PIPE_READERS(inode) ((inode).i_pipe->readers) #define PIPE_WRITERS(inode) ((inode).i_pipe->writers) -#define PIPE_WAITING_READERS(inode) ((inode).i_pipe->waiting_readers) -#define PIPE_WAITING_WRITERS(inode) ((inode).i_pipe->waiting_writers) +#define PIPE_LOCK(inode) (&(inode).i_pipe->lock) +#define PIPE_RLIST(inode) (&(inode).i_pipe->waiting_readers) +#define PIPE_WLIST(inode) (&(inode).i_pipe->waiting_writers) #define PIPE_EMPTY(inode) (PIPE_LEN(inode) == 0) #define PIPE_FULL(inode) (PIPE_LEN(inode) == PIPE_SIZE) @@ -32,4 +51,15 @@ #define PIPE_MAX_RCHUNK(inode) (PIPE_SIZE - PIPE_START(inode)) #define PIPE_MAX_WCHUNK(inode) (PIPE_SIZE - PIPE_END(inode)) +/* synchronization: + * PIPE_SEM protects PIPE_READERS, PIPE_WRITERS and i_pipe. + * i_pipe is also changed before fd_install() and after the last + * frip(). + * PIPE_LOCK protects PIPE_RLIST, PIPE_WLIST, PIPE_START, PIPE_LEN. + * new readers, writers are added to the tail of their lists. + * the head of PIPE_{R,W}LIST can {read,write} data without owning + * PIPE_LOCK. + * if a thread decreases PIPE_{READERS,WRITERS} to 0, then it must + * call pipe_no_{readers,writes}. + */ #endif

- To unsubscribe from this list: send the line "unsubscribe linux-kernel" in the body of a message to majordomo@vger.rutgers.edu Please read the FAQ at http://www.tux.org/lkml/



This archive was generated by hypermail 2b29 : Tue Feb 29 2000 - 21:00:23 EST