LWN.net Logo

Make pipe data structure be a circular list of pages, rather than

From:  Linux Kernel Mailing List <linux-kernel@vger.kernel.org>
To:  bk-commits-head@vger.kernel.org
Subject:  Make pipe data structure be a circular list of pages, rather than
Date:  Fri, 07 Jan 2005 00:29:13 +0000
Archive-link:  Article, Thread

ChangeSet 1.2229.1.1, 2005/01/06 16:29:13-08:00, torvalds@ppc970.osdl.org

	Make pipe data structure be a circular list of pages, rather than
	a circular list of one page.
	
	This improves pipe throughput, and allows us to (eventually)
	use these lists of page buffers for moving data around efficiently.



 fs/pipe.c                 |  130
++++++++++++++++++++++++++++++----------------
 include/linux/pipe_fs_i.h |   19 +++---
 2 files changed, 97 insertions(+), 52 deletions(-)


diff -Nru a/fs/pipe.c b/fs/pipe.c
--- a/fs/pipe.c	2005-01-06 19:13:23 -08:00
+++ b/fs/pipe.c	2005-01-06 19:13:23 -08:00
@@ -14,6 +14,8 @@
 #include <linux/mount.h>
 #include <linux/pipe_fs_i.h>
 #include <linux/uio.h>
+#include <linux/highmem.h>
+
 #include <asm/uaccess.h>
 #include <asm/ioctls.h>
 
@@ -89,6 +91,7 @@
 	   unsigned long nr_segs, loff_t *ppos)
 {
 	struct inode *inode = filp->f_dentry->d_inode;
+	struct pipe_inode_info *info;
 	int do_wakeup;
 	ssize_t ret;
 	struct iovec *iov = (struct iovec *)_iov;
@@ -102,32 +105,40 @@
 	do_wakeup = 0;
 	ret = 0;
 	down(PIPE_SEM(*inode));
+	info = inode->i_pipe;
 	for (;;) {
-		int size = PIPE_LEN(*inode);
-		if (size) {
-			char *pipebuf = PIPE_BASE(*inode) + PIPE_START(*inode);
-			ssize_t chars = PIPE_MAX_RCHUNK(*inode);
+		int bufs = info->nrbufs;
+		if (bufs) {
+			int curbuf = info->curbuf;
+			struct pipe_buffer *buf = info->bufs + curbuf;
+			size_t chars = buf->len;
+			int error;
 
 			if (chars > total_len)
 				chars = total_len;
-			if (chars > size)
-				chars = size;
 
-			if (pipe_iov_copy_to_user(iov, pipebuf, chars)) {
+			error = pipe_iov_copy_to_user(iov, kmap(buf->page) + buf->offset, chars);
+			kunmap(buf->page);
+			if (unlikely(error)) {
 				if (!ret) ret = -EFAULT;
 				break;
 			}
 			ret += chars;
-
-			PIPE_START(*inode) += chars;
-			PIPE_START(*inode) &= (PIPE_SIZE - 1);
-			PIPE_LEN(*inode) -= chars;
+			buf->offset += chars;
+			buf->len -= chars;
+			if (!buf->len) {
+				__free_page(buf->page);
+				buf->page = NULL;
+				curbuf = (curbuf + 1) & (PIPE_BUFFERS-1);
+				info->curbuf = curbuf;
+				info->nrbufs = --bufs;
+				do_wakeup = 1;
+			}
 			total_len -= chars;
-			do_wakeup = 1;
 			if (!total_len)
 				break;	/* common path: read succeeded */
 		}
-		if (PIPE_LEN(*inode)) /* test for cyclic buffers */
+		if (bufs)	/* More to do? */
 			continue;
 		if (!PIPE_WRITERS(*inode))
 			break;
@@ -177,8 +188,8 @@
 	    unsigned long nr_segs, loff_t *ppos)
 {
 	struct inode *inode = filp->f_dentry->d_inode;
+	struct pipe_inode_info *info;
 	ssize_t ret;
-	size_t min;
 	int do_wakeup;
 	struct iovec *iov = (struct iovec *)_iov;
 	size_t total_len;
@@ -190,48 +201,58 @@
 
 	do_wakeup = 0;
 	ret = 0;
-	min = total_len;
-	if (min > PIPE_BUF)
-		min = 1;
 	down(PIPE_SEM(*inode));
+	info = inode->i_pipe;
 	for (;;) {
-		int free;
+		int bufs;
 		if (!PIPE_READERS(*inode)) {
 			send_sig(SIGPIPE, current, 0);
 			if (!ret) ret = -EPIPE;
 			break;
 		}
-		free = PIPE_FREE(*inode);
-		if (free >= min) {
-			/* transfer data */
-			ssize_t chars = PIPE_MAX_WCHUNK(*inode);
-			char *pipebuf = PIPE_BASE(*inode) + PIPE_END(*inode);
+		bufs = info->nrbufs;
+		if (bufs < PIPE_BUFFERS) {
+			ssize_t chars;
+			int newbuf = (info->curbuf + bufs) & (PIPE_BUFFERS-1);
+			struct pipe_buffer *buf = info->bufs + newbuf;
+			struct page *page = alloc_page(GFP_USER);
+			int error;
+
+			if (unlikely(!page)) {
+				ret = ret ? : -ENOMEM;
+				break;
+			}
 			/* Always wakeup, even if the copy fails. Otherwise
 			 * we lock up (O_NONBLOCK-)readers that sleep due to
 			 * syscall merging.
+			 * FIXME! Is this really true?
 			 */
 			do_wakeup = 1;
+			chars = PAGE_SIZE;
 			if (chars > total_len)
 				chars = total_len;
-			if (chars > free)
-				chars = free;
 
-			if (pipe_iov_copy_from_user(pipebuf, iov, chars)) {
+			error = pipe_iov_copy_from_user(kmap(page), iov, chars);
+			kunmap(page);
+			if (unlikely(error)) {
 				if (!ret) ret = -EFAULT;
+				__free_page(page);
 				break;
 			}
 			ret += chars;
 
-			PIPE_LEN(*inode) += chars;
+			/* Insert it into the buffer array */
+			buf->page = page;
+			buf->offset = 0;
+			buf->len = chars;
+			info->nrbufs = ++bufs;
+
 			total_len -= chars;
 			if (!total_len)
 				break;
 		}
-		if (PIPE_FREE(*inode) && ret) {
-			/* handle cyclic data buffers */
-			min = 1;
+		if (bufs < PIPE_BUFFERS)
 			continue;
-		}
 		if (filp->f_flags & O_NONBLOCK) {
 			if (!ret) ret = -EAGAIN;
 			break;
@@ -283,9 +304,23 @@
 pipe_ioctl(struct inode *pino, struct file *filp,
 	   unsigned int cmd, unsigned long arg)
 {
+	struct inode *inode = filp->f_dentry->d_inode;
+	struct pipe_inode_info *info;
+	int count, buf, nrbufs;
+
 	switch (cmd) {
 		case FIONREAD:
-			return put_user(PIPE_LEN(*pino), (int __user *)arg);
+			down(PIPE_SEM(*inode));
+			info =  inode->i_pipe;
+			count = 0;
+			buf = info->curbuf;
+			nrbufs = info->nrbufs;
+			while (--nrbufs >= 0) {
+				count += info->bufs[buf].len;
+				buf = (buf+1) & (PIPE_BUFFERS-1);
+			}
+			up(PIPE_SEM(*inode));
+			return put_user(count, (int __user *)arg);
 		default:
 			return -EINVAL;
 	}
@@ -297,13 +332,16 @@
 {
 	unsigned int mask;
 	struct inode *inode = filp->f_dentry->d_inode;
+	struct pipe_inode_info *info = inode->i_pipe;
+	int nrbufs;
 
 	poll_wait(filp, PIPE_WAIT(*inode), wait);
 
 	/* Reading only -- no need for acquiring the semaphore.  */
-	mask = POLLIN | POLLRDNORM;
-	if (PIPE_EMPTY(*inode))
-		mask = POLLOUT | POLLWRNORM;
+	nrbufs = info->nrbufs;
+	mask = (nrbufs > 0) ? POLLIN | POLLRDNORM : 0;
+	mask |= (nrbufs < PIPE_BUFFERS) ? POLLOUT | POLLWRNORM : 0;
+
 	if (!PIPE_WRITERS(*inode) && filp->f_version != PIPE_WCOUNTER(*inode))
 		mask |= POLLHUP;
 	if (!PIPE_READERS(*inode))
@@ -529,31 +567,37 @@
 
 void free_pipe_info(struct inode *inode)
 {
+	int i;
 	struct pipe_inode_info *info = inode->i_pipe;
+
 	inode->i_pipe = NULL;
-	free_page((unsigned long)info->base);
+	for (i = 0; i < PIPE_BUFFERS; i++) {
+		struct page *page = info->bufs[i].page;
+
+		/* We'll make this a data-dependent free some day .. */
+		if (page)
+			__free_page(page);
+	}
 	kfree(info);
 }
 
 struct inode* pipe_new(struct inode* inode)
 {
 	unsigned long page;
+	struct pipe_inode_info *info;
 
 	page = __get_free_page(GFP_USER);
 	if (!page)
 		return NULL;
 
-	inode->i_pipe = kmalloc(sizeof(struct pipe_inode_info), GFP_KERNEL);
-	if (!inode->i_pipe)
+	info = kmalloc(sizeof(struct pipe_inode_info), GFP_KERNEL);
+	if (!info)
 		goto fail_page;
+	memset(info, 0, sizeof(*info));
+	inode->i_pipe = info;
 
 	init_waitqueue_head(PIPE_WAIT(*inode));
-	PIPE_BASE(*inode) = (char*) page;
-	PIPE_START(*inode) = PIPE_LEN(*inode) = 0;
-	PIPE_READERS(*inode) = PIPE_WRITERS(*inode) = 0;
-	PIPE_WAITING_WRITERS(*inode) = 0;
 	PIPE_RCOUNTER(*inode) = PIPE_WCOUNTER(*inode) = 1;
-	*PIPE_FASYNC_READERS(*inode) = *PIPE_FASYNC_WRITERS(*inode) = NULL;
 
 	return inode;
 fail_page:
diff -Nru a/include/linux/pipe_fs_i.h b/include/linux/pipe_fs_i.h
--- a/include/linux/pipe_fs_i.h	2005-01-06 19:13:23 -08:00
+++ b/include/linux/pipe_fs_i.h	2005-01-06 19:13:23 -08:00
@@ -2,10 +2,18 @@
 #define _LINUX_PIPE_FS_I_H
 
 #define PIPEFS_MAGIC 0x50495045
+
+#define PIPE_BUFFERS (16)
+
+struct pipe_buffer {
+	struct page *page;
+	unsigned short offset, len;
+};
+
 struct pipe_inode_info {
 	wait_queue_head_t wait;
-	char *base;
-	unsigned int len;
+	unsigned int nrbufs, curbuf;
+	struct pipe_buffer bufs[PIPE_BUFFERS];
 	unsigned int start;
 	unsigned int readers;
 	unsigned int writers;
@@ -32,13 +40,6 @@
 #define PIPE_WCOUNTER(inode)	((inode).i_pipe->w_counter)
 #define PIPE_FASYNC_READERS(inode)     (&((inode).i_pipe->fasync_readers))
 #define PIPE_FASYNC_WRITERS(inode)     (&((inode).i_pipe->fasync_writers))
-
-#define PIPE_EMPTY(inode)	(PIPE_LEN(inode) == 0)
-#define PIPE_FULL(inode)	(PIPE_LEN(inode) == PIPE_SIZE)
-#define PIPE_FREE(inode)	(PIPE_SIZE - PIPE_LEN(inode))
-#define PIPE_END(inode)	((PIPE_START(inode) + PIPE_LEN(inode)) &
(PIPE_SIZE-1))
-#define PIPE_MAX_RCHUNK(inode)	(PIPE_SIZE - PIPE_START(inode))
-#define PIPE_MAX_WCHUNK(inode)	(PIPE_SIZE - PIPE_END(inode))
 
 /* Drop the inode semaphore and wait for a pipe event, atomically */
 void pipe_wait(struct inode * inode);

Copyright © 2005, Eklektix, Inc.
Comments and public postings are copyrighted by their creators.
Linux is a registered trademark of Linus Torvalds