| From: |
| Linux Kernel Mailing List <linux-kernel@vger.kernel.org> |
| To: |
| bk-commits-head@vger.kernel.org |
| Subject: |
| Make pipe data structure be a circular list of pages, rather than |
| Date: |
| Fri, 07 Jan 2005 00:29:13 +0000 |
| Archive-link: |
| Article,
Thread
|
ChangeSet 1.2229.1.1, 2005/01/06 16:29:13-08:00, torvalds@ppc970.osdl.org
Make pipe data structure be a circular list of pages, rather than
a circular list of one page.
This improves pipe throughput, and allows us to (eventually)
use these lists of page buffers for moving data around efficiently.
fs/pipe.c | 130
++++++++++++++++++++++++++++++----------------
include/linux/pipe_fs_i.h | 19 +++---
2 files changed, 97 insertions(+), 52 deletions(-)
diff -Nru a/fs/pipe.c b/fs/pipe.c
--- a/fs/pipe.c 2005-01-06 19:13:23 -08:00
+++ b/fs/pipe.c 2005-01-06 19:13:23 -08:00
@@ -14,6 +14,8 @@
#include <linux/mount.h>
#include <linux/pipe_fs_i.h>
#include <linux/uio.h>
+#include <linux/highmem.h>
+
#include <asm/uaccess.h>
#include <asm/ioctls.h>
@@ -89,6 +91,7 @@
unsigned long nr_segs, loff_t *ppos)
{
struct inode *inode = filp->f_dentry->d_inode;
+ struct pipe_inode_info *info;
int do_wakeup;
ssize_t ret;
struct iovec *iov = (struct iovec *)_iov;
@@ -102,32 +105,40 @@
do_wakeup = 0;
ret = 0;
down(PIPE_SEM(*inode));
+ info = inode->i_pipe;
for (;;) {
- int size = PIPE_LEN(*inode);
- if (size) {
- char *pipebuf = PIPE_BASE(*inode) + PIPE_START(*inode);
- ssize_t chars = PIPE_MAX_RCHUNK(*inode);
+ int bufs = info->nrbufs;
+ if (bufs) {
+ int curbuf = info->curbuf;
+ struct pipe_buffer *buf = info->bufs + curbuf;
+ size_t chars = buf->len;
+ int error;
if (chars > total_len)
chars = total_len;
- if (chars > size)
- chars = size;
- if (pipe_iov_copy_to_user(iov, pipebuf, chars)) {
+ error = pipe_iov_copy_to_user(iov, kmap(buf->page) + buf->offset, chars);
+ kunmap(buf->page);
+ if (unlikely(error)) {
if (!ret) ret = -EFAULT;
break;
}
ret += chars;
-
- PIPE_START(*inode) += chars;
- PIPE_START(*inode) &= (PIPE_SIZE - 1);
- PIPE_LEN(*inode) -= chars;
+ buf->offset += chars;
+ buf->len -= chars;
+ if (!buf->len) {
+ __free_page(buf->page);
+ buf->page = NULL;
+ curbuf = (curbuf + 1) & (PIPE_BUFFERS-1);
+ info->curbuf = curbuf;
+ info->nrbufs = --bufs;
+ do_wakeup = 1;
+ }
total_len -= chars;
- do_wakeup = 1;
if (!total_len)
break; /* common path: read succeeded */
}
- if (PIPE_LEN(*inode)) /* test for cyclic buffers */
+ if (bufs) /* More to do? */
continue;
if (!PIPE_WRITERS(*inode))
break;
@@ -177,8 +188,8 @@
unsigned long nr_segs, loff_t *ppos)
{
struct inode *inode = filp->f_dentry->d_inode;
+ struct pipe_inode_info *info;
ssize_t ret;
- size_t min;
int do_wakeup;
struct iovec *iov = (struct iovec *)_iov;
size_t total_len;
@@ -190,48 +201,58 @@
do_wakeup = 0;
ret = 0;
- min = total_len;
- if (min > PIPE_BUF)
- min = 1;
down(PIPE_SEM(*inode));
+ info = inode->i_pipe;
for (;;) {
- int free;
+ int bufs;
if (!PIPE_READERS(*inode)) {
send_sig(SIGPIPE, current, 0);
if (!ret) ret = -EPIPE;
break;
}
- free = PIPE_FREE(*inode);
- if (free >= min) {
- /* transfer data */
- ssize_t chars = PIPE_MAX_WCHUNK(*inode);
- char *pipebuf = PIPE_BASE(*inode) + PIPE_END(*inode);
+ bufs = info->nrbufs;
+ if (bufs < PIPE_BUFFERS) {
+ ssize_t chars;
+ int newbuf = (info->curbuf + bufs) & (PIPE_BUFFERS-1);
+ struct pipe_buffer *buf = info->bufs + newbuf;
+ struct page *page = alloc_page(GFP_USER);
+ int error;
+
+ if (unlikely(!page)) {
+ ret = ret ? : -ENOMEM;
+ break;
+ }
/* Always wakeup, even if the copy fails. Otherwise
* we lock up (O_NONBLOCK-)readers that sleep due to
* syscall merging.
+ * FIXME! Is this really true?
*/
do_wakeup = 1;
+ chars = PAGE_SIZE;
if (chars > total_len)
chars = total_len;
- if (chars > free)
- chars = free;
- if (pipe_iov_copy_from_user(pipebuf, iov, chars)) {
+ error = pipe_iov_copy_from_user(kmap(page), iov, chars);
+ kunmap(page);
+ if (unlikely(error)) {
if (!ret) ret = -EFAULT;
+ __free_page(page);
break;
}
ret += chars;
- PIPE_LEN(*inode) += chars;
+ /* Insert it into the buffer array */
+ buf->page = page;
+ buf->offset = 0;
+ buf->len = chars;
+ info->nrbufs = ++bufs;
+
total_len -= chars;
if (!total_len)
break;
}
- if (PIPE_FREE(*inode) && ret) {
- /* handle cyclic data buffers */
- min = 1;
+ if (bufs < PIPE_BUFFERS)
continue;
- }
if (filp->f_flags & O_NONBLOCK) {
if (!ret) ret = -EAGAIN;
break;
@@ -283,9 +304,23 @@
pipe_ioctl(struct inode *pino, struct file *filp,
unsigned int cmd, unsigned long arg)
{
+ struct inode *inode = filp->f_dentry->d_inode;
+ struct pipe_inode_info *info;
+ int count, buf, nrbufs;
+
switch (cmd) {
case FIONREAD:
- return put_user(PIPE_LEN(*pino), (int __user *)arg);
+ down(PIPE_SEM(*inode));
+ info = inode->i_pipe;
+ count = 0;
+ buf = info->curbuf;
+ nrbufs = info->nrbufs;
+ while (--nrbufs >= 0) {
+ count += info->bufs[buf].len;
+ buf = (buf+1) & (PIPE_BUFFERS-1);
+ }
+ up(PIPE_SEM(*inode));
+ return put_user(count, (int __user *)arg);
default:
return -EINVAL;
}
@@ -297,13 +332,16 @@
{
unsigned int mask;
struct inode *inode = filp->f_dentry->d_inode;
+ struct pipe_inode_info *info = inode->i_pipe;
+ int nrbufs;
poll_wait(filp, PIPE_WAIT(*inode), wait);
/* Reading only -- no need for acquiring the semaphore. */
- mask = POLLIN | POLLRDNORM;
- if (PIPE_EMPTY(*inode))
- mask = POLLOUT | POLLWRNORM;
+ nrbufs = info->nrbufs;
+ mask = (nrbufs > 0) ? POLLIN | POLLRDNORM : 0;
+ mask |= (nrbufs < PIPE_BUFFERS) ? POLLOUT | POLLWRNORM : 0;
+
if (!PIPE_WRITERS(*inode) && filp->f_version != PIPE_WCOUNTER(*inode))
mask |= POLLHUP;
if (!PIPE_READERS(*inode))
@@ -529,31 +567,37 @@
void free_pipe_info(struct inode *inode)
{
+ int i;
struct pipe_inode_info *info = inode->i_pipe;
+
inode->i_pipe = NULL;
- free_page((unsigned long)info->base);
+ for (i = 0; i < PIPE_BUFFERS; i++) {
+ struct page *page = info->bufs[i].page;
+
+ /* We'll make this a data-dependent free some day .. */
+ if (page)
+ __free_page(page);
+ }
kfree(info);
}
struct inode* pipe_new(struct inode* inode)
{
unsigned long page;
+ struct pipe_inode_info *info;
page = __get_free_page(GFP_USER);
if (!page)
return NULL;
- inode->i_pipe = kmalloc(sizeof(struct pipe_inode_info), GFP_KERNEL);
- if (!inode->i_pipe)
+ info = kmalloc(sizeof(struct pipe_inode_info), GFP_KERNEL);
+ if (!info)
goto fail_page;
+ memset(info, 0, sizeof(*info));
+ inode->i_pipe = info;
init_waitqueue_head(PIPE_WAIT(*inode));
- PIPE_BASE(*inode) = (char*) page;
- PIPE_START(*inode) = PIPE_LEN(*inode) = 0;
- PIPE_READERS(*inode) = PIPE_WRITERS(*inode) = 0;
- PIPE_WAITING_WRITERS(*inode) = 0;
PIPE_RCOUNTER(*inode) = PIPE_WCOUNTER(*inode) = 1;
- *PIPE_FASYNC_READERS(*inode) = *PIPE_FASYNC_WRITERS(*inode) = NULL;
return inode;
fail_page:
diff -Nru a/include/linux/pipe_fs_i.h b/include/linux/pipe_fs_i.h
--- a/include/linux/pipe_fs_i.h 2005-01-06 19:13:23 -08:00
+++ b/include/linux/pipe_fs_i.h 2005-01-06 19:13:23 -08:00
@@ -2,10 +2,18 @@
#define _LINUX_PIPE_FS_I_H
#define PIPEFS_MAGIC 0x50495045
+
+#define PIPE_BUFFERS (16)
+
+struct pipe_buffer {
+ struct page *page;
+ unsigned short offset, len;
+};
+
struct pipe_inode_info {
wait_queue_head_t wait;
- char *base;
- unsigned int len;
+ unsigned int nrbufs, curbuf;
+ struct pipe_buffer bufs[PIPE_BUFFERS];
unsigned int start;
unsigned int readers;
unsigned int writers;
@@ -32,13 +40,6 @@
#define PIPE_WCOUNTER(inode) ((inode).i_pipe->w_counter)
#define PIPE_FASYNC_READERS(inode) (&((inode).i_pipe->fasync_readers))
#define PIPE_FASYNC_WRITERS(inode) (&((inode).i_pipe->fasync_writers))
-
-#define PIPE_EMPTY(inode) (PIPE_LEN(inode) == 0)
-#define PIPE_FULL(inode) (PIPE_LEN(inode) == PIPE_SIZE)
-#define PIPE_FREE(inode) (PIPE_SIZE - PIPE_LEN(inode))
-#define PIPE_END(inode) ((PIPE_START(inode) + PIPE_LEN(inode)) &
(PIPE_SIZE-1))
-#define PIPE_MAX_RCHUNK(inode) (PIPE_SIZE - PIPE_START(inode))
-#define PIPE_MAX_WCHUNK(inode) (PIPE_SIZE - PIPE_END(inode))
/* Drop the inode semaphore and wait for a pipe event, atomically */
void pipe_wait(struct inode * inode);