| From: |
| Evgeniy Polyakov <johnpol@2ka.mipt.ru> |
| To: |
| netdev@vger.kernel.org |
| Subject: |
| Receiving zero-copy support. TCP support. |
| Date: |
| Thu, 15 Dec 2005 17:37:51 +0300 |
| Cc: |
| David Miller <davem@davemloft.net>,
Jamal Hadi Salim <hadi@cyberus.ca> |
Hello, developers.
Here is second stage of receiving zero-copy support.
Now it supports all TCP sequence number checks and writes data into
file according to them.
Unfortunately there are some caveats:
1. Linux VFS is not atomic. It has two calls, ->prepare_write() and
->commit_write() which must be called from process context to perform a
write.
Journaling code [JBD] behind that calls always assumes that
exactly one ->commit_write() will follow after exactly one ->prepare_write().
So it is not possible to allocate several pages in advance and copy data
into them from interrupt context. If 4k happens not to be multiply of
MSS, (for example with 1500 MTU), it becomes unresolvable problem.
Fortunately there are some tricks which can be used to fool the VFS
layer. First one is not to use JBD based FS, for example to use ext2.
Second one is data transfer split into several pages on it's boundaries.
Actually with some changes into JBD/ext3 (All journaling changes are
just to adjust h_buffer_credits when starting a journal with existing
handle, which happens when ->prepare_write() is called several times
before ->commit_write().
ext3 changes are simple too: commented check for current journaling
handler in ext3_write_inode(), which is related to above changes.)
receiving zero-copy support works with ext3, but only for small data
transfers (less than 400Mb). After it ext3_ordered_commit_write()
freezes probably in journal_stop() on synchronous transaction batching
trick.
2. Current code will write more than received data into the file due to
page preallocation. I.e. after 1Gb transfer file's size will be more than that
with some garbage at the end. This is not a big unfixable problem, but it will
be fixed a bit later.
Short design notes [more detailed can be found in previous post:
http://marc.theaimsgroup.com/?l=linux-netdev&m=113275....
If network adapter is advanced enough to perform header split or
if it uses "high-performance" data copying from mapped area (like 8139),
it is possible to copy only upto transport layer headers
and provide it to receiving zero-copy subsystem, which will detect if
there is a receiving zero-copy socket for that headers and then
provide a data pointer not from the slab, but for different destination.
For example it can be VFS layer (and pages grabbed and specially prepared),
userspace pages or even different NIC's descriptor.
Such optimisation allows to completely eliminate any data copying into userspace
and back into VFS when writing that data from socket into file.
Attached patch contains:
- receiving zero-copy subsystem.
- TCP receiving interface (UDP was not tested yet, although is included
too, since it is far too easier than TCP). It is implemented as
sendfile() system call for socket->file direction which was prohibited
before.
- 8139too driver modifications which allows to use receiving zero-copy
interface.
Also attached simple userspace client program which uses sendfile()
system call for socket->file data transfer.
One can easily create any other backends for receiving zero-copy
interface like UDP/NFS backend, or network->process' pages backend.
Receiving zero-copy interface does not hurt normal network performance
since it is only check for TCP/UDP protocol and TCP/UDP address/port hash
and walk through likely empty list of zero-copy sockets.
Signed-off-by: Evgeniy Polyakov <johnpol@2ka.mipt.ru>
diff --git a/drivers/net/8139too.c b/drivers/net/8139too.c
--- a/drivers/net/8139too.c
+++ b/drivers/net/8139too.c
@@ -82,6 +82,8 @@
Robert Kuebel - Save kernel thread from dying on any signal.
+ Evgeniy Polyakov - Added receiving zero-copy support.
+
Submitting bug reports:
"rtl8139-diag -mmmaaavvveefN" output
@@ -91,7 +93,7 @@
#define DRV_NAME "8139too"
#define DRV_VERSION "0.9.27"
-
+#define DRV_EXT "-zc"
#include <linux/config.h>
#include <linux/module.h>
@@ -108,11 +110,16 @@
#include <linux/mii.h>
#include <linux/completion.h>
#include <linux/crc32.h>
+#include <linux/if_ether.h>
+#include <linux/ip.h>
+#include <linux/tcp.h>
+#include <linux/udp.h>
+#include <linux/workqueue.h>
#include <asm/io.h>
#include <asm/uaccess.h>
#include <asm/irq.h>
-#define RTL8139_DRIVER_NAME DRV_NAME " Fast Ethernet driver " DRV_VERSION
+#define RTL8139_DRIVER_NAME DRV_NAME " Fast Ethernet driver " DRV_VERSION DRV_EXT
#define PFX DRV_NAME ": "
/* Default Message level */
@@ -792,7 +799,7 @@ static int __devinit rtl8139_init_board
/* set this immediately, we need to know before
* we talk to the chip directly */
- DPRINTK("PIO region size == 0x%02X\n", pio_len);
+ DPRINTK("PIO region size == 0x%02lX\n", pio_len);
DPRINTK("MMIO region size == 0x%02lX\n", mmio_len);
#ifdef USE_IO_OPS
@@ -1897,16 +1904,23 @@ static void rtl8139_rx_err (u32 rx_statu
}
#if RX_BUF_IDX == 3
-static __inline__ void wrap_copy(struct sk_buff *skb, const unsigned char *ring,
+static __inline__ void __wrap_copy(void *data, const unsigned char *ring,
u32 offset, unsigned int size)
{
u32 left = RX_BUF_LEN - offset;
if (size > left) {
- memcpy(skb->data, ring + offset, left);
- memcpy(skb->data+left, ring, size - left);
+ memcpy(data, ring + offset, left);
+ memcpy(data+left, ring, size - left);
} else
- memcpy(skb->data, ring + offset, size);
+ memcpy(data, ring + offset, size);
+
+}
+
+static __inline__ void wrap_copy(struct sk_buff *skb, const unsigned char *ring,
+ u32 offset, unsigned int size)
+{
+ __wrap_copy(skb->data, ring, offset, size);
}
#endif
@@ -1928,6 +1942,103 @@ static void rtl8139_isr_ack(struct rtl81
}
}
+static void rtl8139_copy(void *dst, unsigned char *rx_ring, u32 ring_offset, int size)
+{
+ if (!size)
+ return;
+#if RX_BUF_IDX == 3
+ __wrap_copy(dst, rx_ring, ring_offset, size);
+#else
+ memcpy(dst, &rx_ring[ring_offset], size);
+#endif
+}
+
+static int rtl8139_move_data(struct zc_buf *zb, unsigned int offset, unsigned int sz)
+{
+ struct rtl8139_private *tp = zb->priv_data;
+ unsigned char *rx_ring = tp->rx_ring;
+ unsigned int cur_rx = tp->cur_rx;
+ u32 ring_offset = cur_rx % RX_BUF_LEN;
+ struct sk_buff *skb = zb->skb;
+ skb_frag_t *frag;
+ void *dest;
+
+ if (unlikely(skb_shinfo(skb)->nr_frags == 0) || unlikely(skb_shinfo(skb)->nr_frags >=
MAX_SKB_FRAGS))
+ return -EINVAL;
+
+ frag = &skb_shinfo(skb)->frags[skb_shinfo(skb)->nr_frags-1];
+ dest = kmap_atomic(frag->page, KM_IRQ0);
+ if (!dest)
+ return -ENOMEM;
+
+ rtl8139_copy(dest + frag->page_offset, rx_ring, ring_offset + 4 + zb->header_size + offset, sz);
+
+ kunmap_atomic(dest, KM_IRQ0);
+
+ return sz;
+}
+
+static int rtl8139_copy_header(struct zc_buf *zb)
+{
+ struct rtl8139_private *tp = zb->priv_data;
+ unsigned char *rx_ring = tp->rx_ring;
+ unsigned int cur_rx = tp->cur_rx;
+ u32 ring_offset = cur_rx % RX_BUF_LEN;
+ u8 *orig_ptr, *ptr = zb->header;
+ int tocopy, hsize = 0;
+ struct iphdr *iph;
+ struct tcphdr *tcph;
+ struct ethhdr *eth;
+
+ orig_ptr = ptr;
+
+ ring_offset += 4;
+
+ tocopy = sizeof(struct ethhdr);
+ rtl8139_copy(ptr, rx_ring, ring_offset, tocopy);
+ ptr += tocopy;
+ ring_offset += tocopy;
+
+ eth = (struct ethhdr *)(ptr - sizeof(struct ethhdr));
+ if (eth->h_proto != htons(ETH_P_IP))
+ return -1;
+
+ tocopy = sizeof(struct iphdr);
+ rtl8139_copy(ptr, rx_ring, ring_offset, tocopy);
+ ptr += tocopy;
+ ring_offset += tocopy;
+
+ iph = (struct iphdr *)(ptr - sizeof(struct iphdr));
+ if (iph->protocol == IPPROTO_TCP) {
+ hsize = sizeof(struct tcphdr);
+ } else if (iph->protocol == IPPROTO_UDP) {
+ hsize = sizeof(struct udphdr);
+ } else
+ return -1;
+
+ tocopy = iph->ihl*4 - sizeof(struct iphdr) + hsize;
+ if (tocopy + ptr - orig_ptr > zb->header_size)
+ return -1;
+ rtl8139_copy(ptr, rx_ring, ring_offset, tocopy);
+ ptr += tocopy;
+ ring_offset += tocopy;
+
+ if (iph->protocol == IPPROTO_TCP) {
+ tcph = (struct tcphdr *)(((u8 *)(iph)) + iph->ihl*4);
+ tocopy = tcph->doff*4 - sizeof(struct tcphdr);
+ if (tocopy + ptr - orig_ptr > zb->header_size)
+ return -1;
+ rtl8139_copy(ptr, rx_ring, ring_offset, tocopy);
+ ptr += tocopy;
+ ring_offset += tocopy;
+ }
+
+ zb->header_size = ptr - orig_ptr;
+ zb->size -= zb->header_size;
+
+ return 0;
+}
+
static int rtl8139_rx(struct net_device *dev, struct rtl8139_private *tp,
int budget)
{
@@ -1958,8 +2069,7 @@ static int rtl8139_rx(struct net_device
if (netif_msg_rx_status(tp))
printk(KERN_DEBUG "%s: rtl8139_rx() status %4.4x, size %4.4x,"
- " cur %4.4x.\n", dev->name, rx_status,
- rx_size, cur_rx);
+ " cur %4.4x.\n", dev->name, rx_status, rx_size, cur_rx);
#if RTL8139_DEBUG > 2
{
int i;
@@ -2007,37 +2117,76 @@ no_early_rx:
goto out;
}
- /* Malloc up new buffer, compatible with net-2e. */
- /* Omit the four octet CRC from the length. */
+ {
+ u8 zc_data[256];
+ struct zc_buf *zb;
+
+ memset(&zc_data, 0, sizeof(zc_data));
+ zb = (struct zc_buf *)zc_data;
- skb = dev_alloc_skb (pkt_size + 2);
- if (likely(skb)) {
- skb->dev = dev;
- skb_reserve (skb, 2); /* 16 byte align the IP fields. */
+ zb->header = (void *)(zb + 1);
+ zb->header_size = sizeof(zc_data) - sizeof(struct zc_buf);
+ zb->size = pkt_size;
+ zb->priv_data = tp;
+ zb->move_data = &rtl8139_move_data;
+
+ if (!rtl8139_copy_header(zb)) {
+ skb = alloc_skb_zerocopy(zb, GFP_ATOMIC);
+ if (skb) {
+ int err;
+
+ skb->dev = dev;
+ skb->protocol = eth_type_trans(skb, dev);
+
+ dev->last_rx = jiffies;
+ tp->stats.rx_bytes += pkt_size;
+ tp->stats.rx_packets++;
+ err = netif_receive_skb(skb);
+ }
+ } else {
+ skb = NULL;
+ zb->status = -1;
+ }
+
+ if (!skb) {
+ if (zb->status == -1) {
+ /* Malloc up new buffer, compatible with net-2e. */
+ /* Omit the four octet CRC from the length. */
+
+ skb = dev_alloc_skb (pkt_size + 2);
+ if (likely(skb)) {
+ skb->dev = dev;
+ skb_reserve (skb, 2); /* 16 byte align the IP fields. */
#if RX_BUF_IDX == 3
- wrap_copy(skb, rx_ring, ring_offset+4, pkt_size);
+ wrap_copy(skb, rx_ring, ring_offset+4, pkt_size);
#else
- eth_copy_and_sum (skb, &rx_ring[ring_offset + 4], pkt_size, 0);
+ eth_copy_and_sum (skb, &rx_ring[ring_offset + 4], pkt_size, 0);
#endif
- skb_put (skb, pkt_size);
-
- skb->protocol = eth_type_trans (skb, dev);
+ skb_put (skb, pkt_size);
- dev->last_rx = jiffies;
- tp->stats.rx_bytes += pkt_size;
- tp->stats.rx_packets++;
+ skb->protocol = eth_type_trans (skb, dev);
- netif_receive_skb (skb);
- } else {
- if (net_ratelimit())
- printk (KERN_WARNING
- "%s: Memory squeeze, dropping packet.\n",
- dev->name);
- tp->stats.rx_dropped++;
+ dev->last_rx = jiffies;
+ tp->stats.rx_bytes += pkt_size;
+ tp->stats.rx_packets++;
+
+ netif_receive_skb(skb);
+ } else {
+ if (net_ratelimit())
+ printk (KERN_WARNING
+ "%s: Memory squeeze, dropping packet.\n",
+ dev->name);
+ tp->stats.rx_dropped++;
+ }
+ } else {
+ tp->stats.rx_dropped++;
+ }
+ }
}
+
received++;
- cur_rx = (cur_rx + rx_size + 4 + 3) & ~3;
+ tp->cur_rx = cur_rx = (cur_rx + rx_size + 4 + 3) & ~3;
RTL_W16 (RxBufPtr, (u16) (cur_rx - 16));
rtl8139_isr_ack(tp);
diff --git a/fs/read_write.c b/fs/read_write.c
--- a/fs/read_write.c
+++ b/fs/read_write.c
@@ -15,6 +15,8 @@
#include <linux/module.h>
#include <linux/syscalls.h>
+#include <net/sock.h>
+
#include <asm/uaccess.h>
#include <asm/unistd.h>
@@ -670,8 +672,15 @@ static ssize_t do_sendfile(int out_fd, i
if (!(out_file->f_mode & FMODE_WRITE))
goto fput_out;
retval = -EINVAL;
- if (!out_file->f_op || !out_file->f_op->sendpage)
+ if (!out_file->f_op)
+ goto fput_out;
+
+ if (!SOCKET_I(in_file->f_dentry->d_inode) && !out_file->f_op->sendpage) {
+ printk("%s: sock=%p, sendpage=%p.\n", __func__,
+ SOCKET_I(in_file->f_dentry->d_inode), out_file->f_op->sendpage);
goto fput_out;
+ }
+
out_inode = out_file->f_dentry->d_inode;
retval = rw_verify_area(WRITE, out_file, &out_file->f_pos, count);
if (retval)
@@ -688,7 +697,7 @@ static ssize_t do_sendfile(int out_fd, i
retval = -EINVAL;
if (unlikely(pos < 0))
goto fput_out;
- if (unlikely(pos + count > max)) {
+ if (unlikely((unsigned long long)(pos + count) > (unsigned long long)max)) {
retval = -EOVERFLOW;
if (pos >= max)
goto fput_out;
diff --git a/include/asm-i386/socket.h b/include/asm-i386/socket.h
--- a/include/asm-i386/socket.h
+++ b/include/asm-i386/socket.h
@@ -48,5 +48,6 @@
#define SO_ACCEPTCONN 30
#define SO_PEERSEC 31
+#define SO_ZEROCOPY 34
#endif /* _ASM_SOCKET_H */
diff --git a/include/asm-x86_64/socket.h b/include/asm-x86_64/socket.h
--- a/include/asm-x86_64/socket.h
+++ b/include/asm-x86_64/socket.h
@@ -49,4 +49,6 @@
#define SO_PEERSEC 31
+#define SO_ZEROCOPY 34
+
#endif /* _ASM_SOCKET_H */
diff --git a/include/linux/fs.h b/include/linux/fs.h
--- a/include/linux/fs.h
+++ b/include/linux/fs.h
@@ -64,6 +64,7 @@ extern int dir_notify_enable;
#define FMODE_LSEEK 4
#define FMODE_PREAD 8
#define FMODE_PWRITE FMODE_PREAD /* These go hand in hand */
+#define FMODE_ZEROCOPY 16
#define RW_MASK 1
#define RWA_MASK 2
diff --git a/include/linux/net.h b/include/linux/net.h
--- a/include/linux/net.h
+++ b/include/linux/net.h
@@ -174,6 +174,7 @@ struct net_proto_family {
struct iovec;
struct kvec;
+extern int sock_zc_init(struct socket *sock, int fd);
extern int sock_wake_async(struct socket *sk, int how, int band);
extern int sock_register(struct net_proto_family *fam);
extern int sock_unregister(int family);
diff --git a/include/linux/skbuff.h b/include/linux/skbuff.h
--- a/include/linux/skbuff.h
+++ b/include/linux/skbuff.h
@@ -29,11 +29,14 @@
#include <linux/net.h>
#include <linux/textsearch.h>
#include <net/checksum.h>
+#include <net/zerocopy.h>
#define HAVE_ALLOC_SKB /* For the drivers to know */
#define HAVE_ALIGNABLE_SKB /* Ditto 8) */
#define SLAB_SKB /* Slabified skbuffs */
+#define ZEROCOPY_HEADER_CACHE_SIZE 256 /* Maximum receiving zero-copy header size */
+
#define CHECKSUM_NONE 0
#define CHECKSUM_HW 1
#define CHECKSUM_UNNECESSARY 2
@@ -266,7 +269,8 @@ struct sk_buff {
nfctinfo:3;
__u8 pkt_type:3,
fclone:2,
- ipvs_property:1;
+ ipvs_property:1,
+ zerocopy:1;
__be16 protocol;
void (*destructor)(struct sk_buff *skb);
@@ -1072,6 +1076,11 @@ static inline struct sk_buff *dev_alloc_
return __dev_alloc_skb(length, GFP_ATOMIC);
}
+static inline struct sk_buff *alloc_skb_zerocopy(struct zc_buf *zb, gfp_t gfp_mask)
+{
+ return __alloc_skb_zerocopy(zb, gfp_mask);
+}
+
/**
* skb_cow - copy header of skb when it is required
* @skb: buffer to cow
diff --git a/include/net/sock.h b/include/net/sock.h
--- a/include/net/sock.h
+++ b/include/net/sock.h
@@ -54,6 +54,7 @@
#include <asm/atomic.h>
#include <net/dst.h>
#include <net/checksum.h>
+#include <net/zerocopy.h>
/*
* This structure really needs to be cleaned up.
@@ -61,6 +62,8 @@
* the other protocols.
*/
+//#define SOCK_REFCNT_DEBUG
+
/* Define this to get the SOCK_DBG debugging facility. */
#define SOCK_DEBUGGING
#ifdef SOCK_DEBUGGING
@@ -212,6 +215,9 @@ struct sock {
int sk_route_caps;
unsigned long sk_flags;
unsigned long sk_lingertime;
+
+ struct zsock *zsk;
+
/*
* The backlog queue is special, it is always used with
* the per-socket spinlock held and requires low latency
diff --git a/include/net/zerocopy.h b/include/net/zerocopy.h
new file mode 100644
--- /dev/null
+++ b/include/net/zerocopy.h
@@ -0,0 +1,207 @@
+/*
+ * zerocopy.h
+ *
+ * 2005 Copyright (c) Evgeniy Polyakov <johnpol@2ka.mipt.ru>
+ * All rights reserved.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+ */
+
+#ifndef __ZEROCOPY_H
+#define __ZEROCOPY_H
+
+struct sock_zc_setup_data
+{
+ __u32 type;
+ __u32 size;
+ __u8 data[0];
+};
+
+struct tcp_udp_v4_priv
+{
+ __u32 src;
+ __u32 dst;
+ __u16 sport;
+ __u16 dport;
+ int fd;
+ int pnum;
+};
+
+#ifdef __KERNEL__
+
+#include <linux/skbuff.h>
+#include <linux/pagevec.h>
+#include <linux/mempool.h>
+
+struct zc_sock_bucket
+{
+ struct list_head list;
+ rwlock_t lock;
+};
+
+struct zc_buf;
+struct zsock;
+
+struct zc_handler
+{
+ struct list_head zc_entry;
+ int (* alloc_data)(struct zc_handler *, struct zc_buf *);
+ int (* commit_data)(struct zc_handler *, struct zc_buf *);
+ int (* setup)(struct zc_handler *, struct socket *, struct sock_zc_setup_data *);
+ int (* cleanup)(struct zsock *);
+
+ struct zc_sock_bucket *sock_bucket;
+ unsigned int sock_bucket_number;
+
+ atomic_t refcnt;
+};
+
+struct zc_buf
+{
+ struct zc_handler *zh;
+ void *header;
+ unsigned int header_size;
+ unsigned int size;
+ void *priv;
+ int status;
+ struct sk_buff *skb;
+ int (* move_data)(struct zc_buf *zb, unsigned int offset, unsigned int sz);
+ void *priv_data;
+};
+
+extern struct sk_buff *__alloc_skb_zerocopy(struct zc_buf *zb, gfp_t gfp_mask);
+
+enum zc_page_flags {
+ ZC_PAGE_READY = 0,
+};
+
+enum zc_sock_flags {
+ ZSK_DATA_READY = 0,
+};
+
+#define ZC_POOL_SIZE 1024
+
+extern mempool_t *idx_pool;
+
+#define ZC_MAX_IDX 4
+
+enum zc_state {
+ ZC_OK = 0,
+ ZC_GROW_UP,
+ ZC_GROW_DOWN,
+ ZC_GROW_BOTH,
+ ZC_NEXT,
+};
+
+struct zc_index
+{
+ u16 off;
+ u16 size;
+};
+
+struct zc_index_list_entry
+{
+ struct list_head entry;
+ struct zc_index idx;
+};
+
+struct zc_page
+{
+ struct page *page;
+ unsigned int page_offset;
+ unsigned int size;
+ unsigned int used;
+ u32 seq;
+ long flags;
+ spinlock_t lock;
+
+ unsigned int idx_num;
+ struct zc_index idx[ZC_MAX_IDX];
+ struct list_head idx_list;
+};
+
+struct zsock
+{
+ struct list_head zc_entry;
+ struct zc_handler *handler;
+ atomic_t refcnt;
+ struct sock *sk;
+ int (* zc_alloc_data)(struct zc_buf *zb);
+ int (* zc_commit_data)(struct zc_buf *zb);
+ wait_queue_head_t zc_data_ready;
+ rwlock_t zc_lock;
+ struct zc_page *zc_pages;
+ long zc_flags;
+ unsigned int zc_page_num, zc_page_index;
+ struct pagevec zc_lru_pvec;
+ loff_t zc_pos;
+ struct page *zc_cached_page;
+ struct file *zc_file;
+ u32 zc_seq_first;
+ void *priv;
+ unsigned int priv_size;
+};
+
+int sock_zc_setup_seq(struct zsock *zsk, u32 seq);
+void sk_zc_fini(struct zsock *zsk);
+
+int zc_setup(struct socket *sk, void *data, unsigned int size);
+void zc_cleanup(struct zsock *zsk);
+
+int zc_sock_alloc_data(struct zc_buf *zb);
+int zc_sock_commit_data(struct zc_buf *zb);
+
+int zc_alloc_data(struct zc_buf *zb);
+int zc_commit_data(struct zc_buf *zb);
+
+struct zsock *zsk_alloc(struct zc_handler *handler, void *priv, unsigned int priv_size, int (*
insert)(struct zsock *zsk), gfp_t gfp_mask);
+void zsk_free(struct zsock *zsk);
+
+static inline void zc_handler_get(struct zc_handler *zc)
+{
+ atomic_inc(&zc->refcnt);
+}
+
+static inline void zc_handler_put(struct zc_handler *zc)
+{
+ if (atomic_dec_and_test(&zc->refcnt))
+ printk(KERN_DEBUG "Releasing zc=%p.\n", zc);
+}
+
+static inline void *zsk_priv(struct zsock *zsk)
+{
+ return zsk->priv;
+}
+
+static inline void zsk_get(struct zsock *zsk)
+{
+ atomic_inc(&zsk->refcnt);
+}
+
+static inline void zsk_put(struct zsock *zsk)
+{
+ if (atomic_dec_and_test(&zsk->refcnt))
+ zsk_free(zsk);
+}
+
+int tcp_udp_v4_zc_sock_insert(struct zsock *zsk);
+int tcp_udp_v4_sock_zc_init(struct socket *sock, struct tcp_udp_v4_priv *priv);
+extern struct zc_handler tcp_udp_v4_zc_handler;
+
+int commit_page(struct zc_page *zp, struct file *file, struct address_space *mapping);
+int prepare_page(struct zc_page *zp, struct zsock *zsk, struct file *file, struct address_space
*mapping,
+ loff_t *ppos, loff_t count, struct pagevec *lru_pvec);
+#endif /* __KERNEL__ */
+#endif /* __ZEROCOPY_H */
diff --git a/mm/filemap.c b/mm/filemap.c
--- a/mm/filemap.c
+++ b/mm/filemap.c
@@ -1663,7 +1663,7 @@ EXPORT_SYMBOL(read_cache_page);
* caller's lru-buffering pagevec. This function is specifically for
* generic_file_write().
*/
-static inline struct page *
+struct page *
__grab_cache_page(struct address_space *mapping, unsigned long index,
struct page **cached_page, struct pagevec *lru_pvec)
{
@@ -1692,6 +1692,8 @@ repeat:
return page;
}
+EXPORT_SYMBOL_GPL(__grab_cache_page);
+
/*
* The logic we want is
*
diff --git a/net/core/Makefile b/net/core/Makefile
--- a/net/core/Makefile
+++ b/net/core/Makefile
@@ -3,7 +3,7 @@
#
obj-y := sock.o request_sock.o skbuff.o iovec.o datagram.o stream.o scm.o \
- gen_stats.o gen_estimator.o
+ gen_stats.o gen_estimator.o zerocopy.o
obj-$(CONFIG_SYSCTL) += sysctl_net_core.o
diff --git a/net/core/datagram.c b/net/core/datagram.c
--- a/net/core/datagram.c
+++ b/net/core/datagram.c
@@ -214,6 +214,9 @@ int skb_copy_datagram_iovec(const struct
int i, err, fraglen, end = 0;
struct sk_buff *next = skb_shinfo(skb)->frag_list;
+ if (skb->zerocopy)
+ return 0;
+
if (!len)
return 0;
@@ -382,6 +385,9 @@ int skb_copy_and_csum_datagram_iovec(str
{
unsigned int csum;
int chunk = skb->len - hlen;
+
+ if (skb->zerocopy)
+ return 0;
/* Skip filled elements.
* Pretty silly, look at memcpy_toiovec, though 8)
diff --git a/net/core/skbuff.c b/net/core/skbuff.c
--- a/net/core/skbuff.c
+++ b/net/core/skbuff.c
@@ -70,6 +70,7 @@
static kmem_cache_t *skbuff_head_cache __read_mostly;
static kmem_cache_t *skbuff_fclone_cache __read_mostly;
+static kmem_cache_t *skbuff_head_cache_zerocopy __read_mostly;
/*
* Keep out-of-line to prevent kernel bloat.
@@ -186,6 +187,62 @@ nodata:
goto out;
}
+struct sk_buff *__alloc_skb_zerocopy(struct zc_buf *zb, gfp_t gfp_mask)
+{
+ struct sk_buff *skb = NULL;
+ void *data;
+ int err;
+ unsigned int size = SKB_DATA_ALIGN(zb->header_size);
+
+ zb->status = -1;
+
+ if (size > ZEROCOPY_HEADER_CACHE_SIZE)
+ goto out;
+
+ skb = kmem_cache_alloc(skbuff_head_cache, gfp_mask & ~__GFP_DMA);
+ if (!skb)
+ goto out;
+
+ data = kmem_cache_alloc(skbuff_head_cache_zerocopy, gfp_mask & ~__GFP_DMA);
+ if (!data)
+ goto err_out_free_skb;
+
+ memset(skb, 0, offsetof(struct sk_buff, truesize));
+ skb->truesize = size + sizeof(struct sk_buff);
+ atomic_set(&skb->users, 1);
+ skb->head = data;
+ skb->data = data;
+ skb->tail = data;
+ skb->end = data + size;
+
+ atomic_set(&(skb_shinfo(skb)->dataref), 1);
+ skb_shinfo(skb)->nr_frags = 0;
+ skb_shinfo(skb)->tso_size = 0;
+ skb_shinfo(skb)->tso_segs = 0;
+ skb_shinfo(skb)->frag_list = NULL;
+
+ skb->ip_summed = CHECKSUM_UNNECESSARY;
+ skb->zerocopy = 1;
+ /* It could be zerocopied too, but let's use it as is for now. --zbr 2005_10_27 */
+ memcpy(skb->data, zb->header, zb->header_size);
+ skb_put(skb, zb->header_size);
+
+ zb->skb = skb;
+
+ err = zc_alloc_data(zb);
+ if (err)
+ goto err_out_free_skb_data;
+
+out:
+ return skb;
+err_out_free_skb_data:
+ kmem_cache_free(skbuff_head_cache_zerocopy, data);
+err_out_free_skb:
+ kmem_cache_free(skbuff_head_cache, skb);
+ skb = NULL;
+ goto out;
+}
+
/**
* alloc_skb_from_cache - allocate a network buffer
* @cp: kmem_cache from which to allocate the data area
@@ -288,7 +345,10 @@ void kfree_skbmem(struct sk_buff *skb)
struct sk_buff *other;
atomic_t *fclone_ref;
- skb_release_data(skb);
+ if (skb->zerocopy)
+ kmem_cache_free(skbuff_head_cache_zerocopy, skb->head);
+ else
+ skb_release_data(skb);
switch (skb->fclone) {
case SKB_FCLONE_UNAVAILABLE:
kmem_cache_free(skbuff_head_cache, skb);
@@ -412,6 +472,7 @@ struct sk_buff *skb_clone(struct sk_buff
C(priority);
C(protocol);
n->destructor = NULL;
+ n->zerocopy = 0;
#ifdef CONFIG_NETFILTER
C(nfmark);
C(nfct);
@@ -477,6 +538,7 @@ static void copy_skb_header(struct sk_bu
memcpy(new->cb, old->cb, sizeof(old->cb));
new->local_df = old->local_df;
new->fclone = SKB_FCLONE_UNAVAILABLE;
+ new->zerocopy = 0;
new->pkt_type = old->pkt_type;
new->tstamp = old->tstamp;
new->destructor = NULL;
@@ -1803,6 +1865,14 @@ void __init skb_init(void)
NULL, NULL);
if (!skbuff_fclone_cache)
panic("cannot create skbuff cache");
+
+ skbuff_head_cache_zerocopy = kmem_cache_create("skbuff_head_cache_zerocopy",
+ ZEROCOPY_HEADER_CACHE_SIZE + sizeof(struct skb_shared_info),
+ 0,
+ SLAB_HWCACHE_ALIGN,
+ NULL, NULL);
+ if (!skbuff_head_cache_zerocopy)
+ panic("cannot create zerocopy skbuff cache");
}
EXPORT_SYMBOL(___pskb_trim);
@@ -1837,3 +1907,4 @@ EXPORT_SYMBOL(skb_seq_read);
EXPORT_SYMBOL(skb_abort_seq_read);
EXPORT_SYMBOL(skb_find_text);
EXPORT_SYMBOL(skb_append_datato_frags);
+EXPORT_SYMBOL(__alloc_skb_zerocopy);
diff --git a/net/core/sock.c b/net/core/sock.c
--- a/net/core/sock.c
+++ b/net/core/sock.c
@@ -129,6 +129,8 @@
#include <net/tcp.h>
#endif
+#include <net/zerocopy.h>
+
/* Take into consideration the size of the struct sk_buff overhead in the
* determination of these values, since that is non-constant across
* platforms. This makes socket queueing behavior and performance
@@ -455,6 +457,18 @@ set_rcvbuf:
spin_unlock_bh(&sk->sk_lock.slock);
ret = -ENONET;
break;
+ case SO_ZEROCOPY:
+ {
+ u8 zcdata[256];
+
+ ret = -EINVAL;
+ if (optlen > sizeof(zcdata))
+ break;
+ if (copy_from_user(zcdata, optval, optlen))
+ break;
+ ret = zc_setup(sock, zcdata, optlen);
+ }
+ break;
/* We implement the SO_SNDLOWAT etc to
not be settable (1003.1g 5.3) */
@@ -684,6 +698,9 @@ void sk_free(struct sock *sk)
if (sk->sk_destruct)
sk->sk_destruct(sk);
+ sk_zc_fini(sk->zsk);
+ zc_cleanup(sk->zsk);
+
filter = sk->sk_filter;
if (filter) {
sk_filter_release(sk, filter);
diff --git a/net/core/zerocopy.c b/net/core/zerocopy.c
new file mode 100644
--- /dev/null
+++ b/net/core/zerocopy.c
@@ -0,0 +1,601 @@
+/*
+ * zerocopy.c
+ *
+ * 2005 Copyright (c) Evgeniy Polyakov <johnpol@2ka.mipt.ru>
+ * All rights reserved.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+ */
+
+#include <linux/config.h>
+#include <linux/mm.h>
+#include <linux/spinlock.h>
+#include <linux/list.h>
+#include <linux/skbuff.h>
+#include <linux/pagemap.h>
+#include <linux/swap.h>
+#include <linux/writeback.h>
+#include <linux/ip.h>
+#include <linux/tcp.h>
+#include <linux/tcp.h>
+#include <linux/udp.h>
+#include <linux/fs.h>
+#include <linux/file.h>
+
+#include <asm/semaphore.h>
+
+#include <net/inet_hashtables.h>
+#include <net/zerocopy.h>
+
+static int tcp_udp_v4_sendfile_alloc_data(struct zc_handler *zh, struct zc_buf *zb);
+static int tcp_udp_v4_sendfile_commit_data(struct zc_handler *zh, struct zc_buf *zb);
+static int tcp_udp_v4_sendfile_setup(struct zc_handler *zh, struct socket *sock, struct
sock_zc_setup_data *p);
+static int tcp_udp_v4_sendfile_cleanup(struct zsock *);
+
+#define ZC_HASH_MASK 0xf
+static struct zc_sock_bucket tcp_udp_v4_sock_bucket[ZC_HASH_MASK];
+
+struct zc_handler tcp_udp_v4_zc_handler = {
+ .alloc_data = &tcp_udp_v4_sendfile_alloc_data,
+ .commit_data = &tcp_udp_v4_sendfile_commit_data,
+ .setup = &tcp_udp_v4_sendfile_setup,
+ .cleanup = &tcp_udp_v4_sendfile_cleanup,
+ .sock_bucket = tcp_udp_v4_sock_bucket,
+ .sock_bucket_number = ZC_HASH_MASK,
+};
+
+static DECLARE_MUTEX(zc_handler_lock);
+static LIST_HEAD(zc_handler_list);
+
+static kmem_cache_t *idx_cache;
+mempool_t *idx_pool;
+
+static int zc_init(void)
+{
+ idx_cache = kmem_cache_create("zc_index", sizeof(struct zc_index_list_entry), 0,
SLAB_HWCACHE_ALIGN, NULL, NULL);
+ if (!idx_cache)
+ return -ENOMEM;
+
+ idx_pool = mempool_create(ZC_POOL_SIZE, mempool_alloc_slab, mempool_free_slab, idx_cache);
+ if (!idx_pool) {
+ kmem_cache_destroy(idx_cache);
+ return -ENOMEM;
+ }
+
+ return 0;
+}
+
+int zc_alloc_data(struct zc_buf *zb)
+{
+ struct zc_handler *zh;
+ int err = -ENODEV;
+
+ if (unlikely(zb->size > PAGE_SIZE))
+ return err;
+
+ rcu_read_lock();
+ list_for_each_entry_rcu(zh, &zc_handler_list, zc_entry) {
+ err = zh->alloc_data(zh, zb);
+ if (!err) {
+ zb->zh = zh;
+ break;
+ }
+ }
+ rcu_read_unlock();
+
+ return err;
+}
+
+int zc_commit_data(struct zc_buf *zb)
+{
+ int err = -EINVAL;
+
+ if (zb->zh)
+ err = zb->zh->commit_data(zb->zh, zb);
+
+ return err;
+}
+
+void zc_cleanup(struct zsock *zsk)
+{
+ if (!zsk)
+ return;
+
+ zsk_put(zsk);
+}
+
+int zc_setup(struct socket *sock, void *data, unsigned int size)
+{
+ struct sock_zc_setup_data *p = data;
+ int found = 0;
+ struct zc_handler *zh;
+
+ if (size <= sizeof(struct sock_zc_setup_data) ||
+ size != htonl(p->size) + sizeof(struct sock_zc_setup_data)) {
+ goto err_out_exit;
+ }
+
+ down(&zc_handler_lock);
+ list_for_each_entry(zh, &zc_handler_list, zc_entry) {
+ if (!zh->setup(zh, sock, p)) {
+ found = 1;
+ break;
+ }
+ }
+ up(&zc_handler_lock);
+
+err_out_exit:
+ return (found)?0:-ENODEV;
+}
+
+int zc_add_handler(struct zc_handler *h)
+{
+ if (!h->alloc_data || !h->commit_data || !h->sock_bucket || !h->sock_bucket_number ||
+ !h->setup || !h->cleanup)
+ return -EINVAL;
+
+ synchronize_rcu();
+
+ down(&zc_handler_lock);
+ list_add_rcu(&h->zc_entry, &zc_handler_list);
+ up(&zc_handler_lock);
+
+ return 0;
+}
+
+void zc_del_handler(struct zc_handler *h)
+{
+ synchronize_rcu();
+
+ down(&zc_handler_lock);
+ list_del_rcu(&h->zc_entry);
+ up(&zc_handler_lock);
+}
+
+static inline void zc_clean_page(struct zc_page *zp)
+{
+ if (likely(zp->idx_num <= ZC_MAX_IDX)) {
+ memset(&zp->idx, 0, sizeof(zp->idx));
+ } else {
+ struct zc_index_list_entry *e, *n;
+
+ list_for_each_entry_safe(e, n, &zp->idx_list, entry) {
+ list_del(&e->entry);
+ mempool_free(e, idx_pool);
+ }
+ }
+
+ INIT_LIST_HEAD(&zp->idx_list);
+ zp->idx_num = 0;
+}
+
+extern struct page * __grab_cache_page(struct address_space *mapping, unsigned long index,
+ struct page **cached_page, struct pagevec *lru_pvec);
+
+int commit_page(struct zc_page *zp, struct file *file, struct address_space *mapping)
+{
+ int err;
+ struct address_space_operations *a_ops = mapping->a_ops;
+
+ if (down_interruptible(&mapping->host->i_sem)) {
+ err = -EBUSY;
+ goto err_out;
+ }
+ ClearPageReserved(zp->page);
+ flush_dcache_page(zp->page);
+ err = a_ops->commit_write(file, zp->page, zp->page_offset, zp->page_offset+zp->used);
+ unlock_page(zp->page);
+ mark_page_accessed(zp->page);
+ page_cache_release(zp->page);
+
+ if (err < 0)
+ goto err_out_exit;
+
+ if (zp->used)
+ balance_dirty_pages_ratelimited(mapping);
+
+err_out_exit:
+ up(&mapping->host->i_sem);
+err_out:
+ return err;
+}
+
+int prepare_page(struct zc_page *zp, struct zsock *zsk, struct file *file, struct address_space
*mapping,
+ loff_t *ppos, loff_t count, struct pagevec *lru_pvec)
+{
+ unsigned long index;
+ unsigned long page_offset;
+ unsigned long bytes;
+ struct address_space_operations *a_ops = mapping->a_ops;
+ loff_t pos_allocated = *ppos;
+ int err = 0;
+ unsigned int diff = zsk->zc_page_num << PAGE_CACHE_SHIFT;
+ u32 new_seq = zp->seq + diff;
+
+ if (likely(zp->page)) {
+ page_offset = zp->page_offset;
+ index = zp->page->index + (diff >> PAGE_CACHE_SHIFT);
+ } else {
+ page_offset = (pos_allocated & (PAGE_CACHE_SIZE - 1));
+ index = pos_allocated >> PAGE_CACHE_SHIFT;
+ }
+
+ bytes = PAGE_CACHE_SIZE - page_offset;
+ if (bytes > count)
+ bytes = count;
+
+ if (down_interruptible(&mapping->host->i_sem)) {
+ err = -EBUSY;
+ goto err_out;
+ }
+ zp->page = __grab_cache_page(mapping, index, &zsk->zc_cached_page, lru_pvec);
+ if (!zp->page) {
+ err = -ENOMEM;
+ goto err_out_exit;
+ }
+
+ err = a_ops->prepare_write(file, zp->page, page_offset, page_offset+bytes);
+ if (unlikely(err)) {
+ unlock_page(zp->page);
+ page_cache_release(zp->page);
+ goto err_out_exit;
+ }
+ SetPageReserved(zp->page);
+
+ zc_clean_page(zp);
+
+ zp->page_offset = page_offset;
+ zp->size = bytes;
+ zp->used = 0;
+ zp->seq = new_seq;
+ clear_bit(ZC_PAGE_READY, &zp->flags);
+
+ pos_allocated += bytes;
+
+ *ppos = pos_allocated;
+
+err_out_exit:
+ up(&mapping->host->i_sem);
+err_out:
+ return err;
+}
+
+
+void sk_zc_fini(struct zsock *zsk)
+{
+ if (zsk) {
+ unsigned int zc_page_num;
+ struct zc_page *zc_pages;
+ unsigned long flags;
+ struct sock *sk = NULL;
+
+ write_lock_irqsave(&zsk->zc_lock, flags);
+ zc_page_num = zsk->zc_page_num;
+ zc_pages = zsk->zc_pages;
+
+ zsk->zc_pages = NULL;
+ zsk->zc_page_num = 0;
+ zsk->zc_page_index = 0;
+ zsk->zc_alloc_data = NULL;
+ zsk->zc_commit_data = NULL;
+ if (zsk->sk) {
+ sk = zsk->sk;
+ zsk->sk->zsk = NULL;
+ zsk->sk = NULL;
+ }
+ write_unlock_irqrestore(&zsk->zc_lock, flags);
+
+ synchronize_rcu();
+
+ if (zc_page_num) {
+ struct address_space *mapping = zsk->zc_file->f_mapping;
+ loff_t size = 0;
+ int i;
+
+ if (sk)
+ skb_queue_purge(&sk->sk_receive_queue);
+
+ zsk->handler->cleanup(zsk);
+ zc_handler_put(zsk->handler);
+
+ /*
+ * No new skbs can contribute data into VFS cache after this
+ * condition, so we only must care about those which are
+ * in socket queue already or will be inserted there after
+ * allocation, but allocation itself will always fail
+ * due to above locked changes.
+ */
+
+ for (i=0; i<zc_page_num; ++i) {
+ struct zc_page *zp = &zc_pages[i];
+
+ size += zp->used;
+ commit_page(zp, zsk->zc_file, mapping);
+ zc_clean_page(zp);
+ }
+
+ if (zsk->zc_cached_page) {
+ page_cache_release(zsk->zc_cached_page);
+ zsk->zc_cached_page = NULL;
+ }
+
+ pagevec_lru_add(&zsk->zc_lru_pvec);
+
+ if (!size)
+ zsk->zc_file->f_pos = size;
+ zsk->zc_file->f_mode &= ~FMODE_ZEROCOPY;
+ fput(zsk->zc_file);
+ zsk->zc_file = NULL;
+
+ kfree(zc_pages);
+ }
+ }
+}
+
+static void sk_zc_init(struct zsock *zsk)
+{
+ rwlock_init(&zsk->zc_lock);
+ init_waitqueue_head(&zsk->zc_data_ready);
+ zsk->zc_pages = NULL;
+ zsk->zc_page_num = 0;
+ zsk->zc_page_index = 0;
+ zsk->zc_alloc_data = NULL;
+ zsk->zc_commit_data = NULL;
+ zsk->zc_file = NULL;
+ zsk->zc_cached_page = NULL;
+}
+
+struct zsock *zsk_alloc(struct zc_handler *handler, void *priv, unsigned int priv_size, int (*
insert)(struct zsock *zsk), gfp_t gfp_mask)
+{
+ struct zsock *zsk;
+
+ zsk = kzalloc(sizeof(struct zsock) + priv_size, gfp_mask);
+ if (!zsk)
+ return NULL;
+
+ /* 1 for generic socket usage, i.e. it could be removed from sock_close(). */
+ atomic_set(&zsk->refcnt, 1);
+ zsk->handler = handler;
+ zsk->priv_size = priv_size;
+ if (priv_size) {
+ zsk->priv = zsk+1;
+ memcpy(zsk->priv, priv, priv_size);
+ } else
+ zsk->priv = NULL;
+
+ zc_handler_get(handler);
+
+ sk_zc_init(zsk);
+
+ if (insert) {
+ int err;
+
+ err = insert(zsk);
+ if (err) {
+ zc_handler_put(handler);
+ zsk_free(zsk);
+ return NULL;
+ }
+ }
+
+ return zsk;
+}
+
+void zsk_free(struct zsock *zsk)
+{
+ kfree(zsk);
+}
+
+static inline u32 tcp_udp_v4_hash(unsigned int bucket_number, const u32 src, const u16 sport,
const u32 dst, const u16 dport)
+{
+ return inet_ehashfn(src, sport, dst, dport) & (bucket_number - 1);
+}
+
+int tcp_udp_v4_zc_sock_insert(struct zsock *zsk)
+{
+ u32 hash;
+ unsigned long flags;
+ struct tcp_udp_v4_priv *priv = zsk_priv(zsk);
+ struct zc_sock_bucket *b;
+
+ if (!priv)
+ return -ENODEV;
+
+ hash = tcp_udp_v4_hash(zsk->handler->sock_bucket_number, priv->src, priv->sport, priv->dst,
priv->dport);
+
+ b = &zsk->handler->sock_bucket[hash];
+
+ write_lock_irqsave(&b->lock, flags);
+ list_add_rcu(&zsk->zc_entry, &b->list);
+ write_unlock_irqrestore(&b->lock, flags);
+
+ return 0;
+}
+
+int tcp_udp_v4_zc_sock_remove(struct zsock *zsk)
+{
+ u32 hash;
+ unsigned long flags;
+ struct tcp_udp_v4_priv *priv = zsk_priv(zsk);
+ struct zc_sock_bucket *b;
+
+ if (!priv)
+ return -ENODEV;
+
+ hash = tcp_udp_v4_hash(zsk->handler->sock_bucket_number, priv->src, priv->sport, priv->dst,
priv->dport);
+
+ b = &zsk->handler->sock_bucket[hash];
+
+ write_lock_irqsave(&b->lock, flags);
+ list_del_rcu(&zsk->zc_entry);
+ write_unlock_irqrestore(&b->lock, flags);
+
+ return 0;
+}
+
+/*
+ * Must be called under RCU cover and with interrupts disabled.
+ */
+static struct zsock *tcp_udp_v4_zc_sock_lookup(const struct zc_sock_bucket *bucket, const unsigned
int bucket_number,
+ const u32 src, const u16 sport, const u32 dst, const u16 dport)
+{
+ u32 hash = tcp_udp_v4_hash(bucket_number, src, sport, dst, dport);
+ struct zsock *zsk;
+ struct tcp_udp_v4_priv *priv;
+
+ list_for_each_entry_rcu(zsk, &bucket[hash].list, zc_entry) {
+ priv = zsk_priv(zsk);
+
+ if (priv->sport == sport && priv->dport == dport && priv->src == src && priv->dst == dst) {
+ zsk_get(zsk);
+ return zsk;
+ }
+ }
+
+ return NULL;
+}
+
+static int tcp_udp_v4_sendfile_alloc_data(struct zc_handler *zh, struct zc_buf *zb)
+{
+ struct ethhdr *eth;
+ struct iphdr *iph;
+ struct zsock *zsk;
+ int err = -EINVAL;
+ u16 sport, dport;
+ unsigned long flags;
+ u32 seq, ack;
+
+ if (zb->header_size < sizeof(struct ethhdr) + sizeof(struct iphdr))
+ goto err_out_exit;
+
+ eth = zb->header;
+
+ if (eth->h_proto != htons(ETH_P_IP))
+ goto err_out_exit;
+
+ iph = (struct iphdr *)(eth + 1);
+
+ if (iph->protocol != IPPROTO_TCP && iph->protocol != IPPROTO_UDP)
+ goto err_out_exit;
+
+ if (iph->protocol == IPPROTO_TCP) {
+ struct tcphdr *tcph = (struct tcphdr *)(((u8 *)iph) + iph->ihl*4);
+ if (zb->header_size < sizeof(struct ethhdr) + sizeof(struct iphdr) + sizeof(struct tcphdr))
+ goto err_out_exit;
+ sport = tcph->source;
+ dport = tcph->dest;
+ seq = ntohl(tcph->seq);
+ ack = ntohl(tcph->ack_seq);
+ } else {
+ struct udphdr *udph = (struct udphdr *)(((u8 *)iph) + iph->ihl*4);
+ if (zb->header_size < sizeof(struct ethhdr) + sizeof(struct iphdr) + sizeof(struct udphdr))
+ goto err_out_exit;
+ sport = udph->source;
+ dport = udph->dest;
+ seq = ack = 0;
+ }
+
+ local_irq_save(flags);
+ rcu_read_lock();
+ zsk = tcp_udp_v4_zc_sock_lookup(zh->sock_bucket, zh->sock_bucket_number, iph->daddr, dport,
iph->saddr, sport);
+ if (zsk) {
+#if 0
+ printk("%s: %u.%u.%u.%u:%u -> %u.%u.%u.%u:%u, seq=%u, ack=%u, size=%u.\n",
+ __func__, NIPQUAD(iph->saddr), htons(sport), NIPQUAD(iph->daddr), htons(dport), seq, ack,
zb->size);
+#endif
+
+ read_lock(&zsk->zc_lock);
+ if (zsk->zc_alloc_data && zsk->zc_pages) {
+ zb->priv = zsk;
+ err = zsk->zc_alloc_data(zb);
+ zb->status = (err)?1:0;
+ wake_up(&zsk->zc_data_ready);
+ }
+ read_unlock(&zsk->zc_lock);
+ zsk_put(zsk);
+ }
+ rcu_read_unlock();
+ local_irq_restore(flags);
+
+err_out_exit:
+ return err;
+}
+
+static int tcp_udp_v4_sendfile_commit_data(struct zc_handler *zh, struct zc_buf *zb)
+{
+ struct zsock *zsk = zb->priv;
+ int err;
+ unsigned long flags;
+
+ read_lock_irqsave(&zsk->zc_lock, flags);
+ err = zsk->zc_commit_data(zb);
+ read_unlock_irqrestore(&zsk->zc_lock, flags);
+
+ wake_up(&zsk->zc_data_ready);
+
+ return err;
+}
+
+static int tcp_udp_v4_sendfile_check(struct zc_handler *zh, struct socket *sock, struct
sock_zc_setup_data *p)
+{
+ struct tcp_udp_v4_priv *priv;
+ u32 type = ntohl(p->type);
+ u32 size = ntohl(p->size);
+
+ if (type != IPPROTO_TCP && type != IPPROTO_UDP)
+ return -EINVAL;
+
+ if (size != sizeof(struct tcp_udp_v4_priv))
+ return -EINVAL;
+
+ priv = (struct tcp_udp_v4_priv *)p->data;
+
+ return 0;
+}
+
+static int tcp_udp_v4_sendfile_setup(struct zc_handler *zh, struct socket *sock, struct
sock_zc_setup_data *p)
+{
+ struct tcp_udp_v4_priv *priv = (struct tcp_udp_v4_priv *)p->data;
+ int err;
+
+ err = tcp_udp_v4_sendfile_check(zh, sock, p);
+ if (err)
+ return err;
+
+ return tcp_udp_v4_sock_zc_init(sock, priv);
+}
+
+static int tcp_udp_v4_sendfile_cleanup(struct zsock *zsk)
+{
+ tcp_udp_v4_zc_sock_remove(zsk);
+ return 0;
+}
+
+static int zc_add_tcp(void)
+{
+ int i;
+
+ for (i=0; i<tcp_udp_v4_zc_handler.sock_bucket_number; ++i) {
+ INIT_LIST_HEAD(&tcp_udp_v4_zc_handler.sock_bucket[i].list);
+ rwlock_init(&tcp_udp_v4_zc_handler.sock_bucket[i].lock);
+ }
+
+ atomic_set(&tcp_udp_v4_zc_handler.refcnt, 1);
+
+ return zc_add_handler(&tcp_udp_v4_zc_handler);
+}
+
+device_initcall(zc_init);
+late_initcall(zc_add_tcp);
diff --git a/net/socket.c b/net/socket.c
--- a/net/socket.c
+++ b/net/socket.c
@@ -44,6 +44,7 @@
* Tigran Aivazian : sys_send(args) calls sys_sendto(args, NULL, 0)
* Tigran Aivazian : Made listen(2) backlog sanity checks
* protocol-independent
+ * Evgeniy Polyakov: Receiving zero-copy.
*
*
* This program is free software; you can redistribute it and/or
@@ -63,6 +64,7 @@
#include <linux/smp_lock.h>
#include <linux/socket.h>
#include <linux/file.h>
+#include <linux/fs.h>
#include <linux/net.h>
#include <linux/interrupt.h>
#include <linux/netdevice.h>
@@ -84,6 +86,11 @@
#include <linux/compat.h>
#include <linux/kmod.h>
#include <linux/audit.h>
+#include <linux/pagemap.h>
+#include <linux/swap.h>
+#include <linux/writeback.h>
+#include <linux/ip.h>
+#include <linux/tcp.h>
#ifdef CONFIG_NET_RADIO
#include <linux/wireless.h> /* Note : will define WIRELESS_EXT */
@@ -116,6 +123,7 @@ static ssize_t sock_writev(struct file *
unsigned long count, loff_t *ppos);
static ssize_t sock_sendpage(struct file *file, struct page *page,
int offset, size_t size, loff_t *ppos, int more);
+static ssize_t sock_sendfile(struct file *file, loff_t *ppos, size_t count, read_actor_t actor,
void *target);
/*
@@ -136,7 +144,8 @@ static struct file_operations socket_fil
.fasync = sock_fasync,
.readv = sock_readv,
.writev = sock_writev,
- .sendpage = sock_sendpage
+ .sendpage = sock_sendpage,
+ .sendfile = sock_sendfile,
};
/*
@@ -726,6 +735,616 @@ static ssize_t sock_aio_write(struct kio
return __sock_sendmsg(iocb, sock, &x->async_msg, size);
}
+static int __sock_zc_setup_seq(struct zsock *zsk, u32 seq)
+{
+ u32 off = 0;
+
+ WARN_ON(zsk->zc_seq_first);
+
+ zsk->zc_seq_first = seq;
+
+ if (zsk->zc_pages) {
+ int i;
+ struct zc_page *zp;
+
+ for (i=0; i<zsk->zc_page_num; ++i) {
+ zp = &zsk->zc_pages[i];
+
+ zp->seq = zsk->zc_seq_first + off;
+ off += zp->size;
+ }
+ }
+
+ return 0;
+}
+
+int sock_zc_setup_seq(struct zsock *zsk, u32 seq)
+{
+ unsigned long flags;
+ int err;
+
+ write_lock_irqsave(&zsk->zc_lock, flags);
+ err = __sock_zc_setup_seq(zsk, seq);
+ write_unlock_irqrestore(&zsk->zc_lock, flags);
+
+ return err;
+}
+
+/*
+ * It's a tricky way to lose your data...
+ * To avoid this we must setup sequnce number
+ * using sock_zc_setup_seq() from internals of TCP state machine.
+ */
+static int zc_calc_index(struct zsock *zsk, u32 seq)
+{
+ int index, try = 0, inc = 0;
+ struct zc_page *zp;
+
+ if (seq < zsk->zc_seq_first)
+ return -1;
+
+ index = (seq - zsk->zc_seq_first) / PAGE_SIZE;
+
+ index %= zsk->zc_page_num;
+
+ while (index >= 0 && index < zsk->zc_page_num && try < zsk->zc_page_num) {
+ zp = &zsk->zc_pages[index];
+
+ if (seq >= zp->seq && seq < zp->seq + zp->size)
+ return index;
+
+ if (seq > zp->seq + zp->size) {
+ inc = 1;
+ index++;
+ } else if (seq < zp->seq) {
+ if (inc)
+ return -1;
+
+ inc = 0;
+ index--;
+ }
+
+ try++;
+ }
+
+ if (index >= zsk->zc_page_num || try >= zsk->zc_page_num)
+ return -1;
+
+ return index;
+}
+
+static inline int zc_check_seq_index(struct zc_index *idx, u16 diff, u16 size, u16 *grow)
+{
+ if (diff >= idx->off + idx->size || diff + size < idx->off)
+ return ZC_NEXT;
+
+ if (diff >= idx->off) {
+ if (size <= idx->size) {
+ *grow = 0;
+ return ZC_OK;
+ }
+ if (size > idx->size) {
+ *grow = diff + size - idx->off - idx->size;
+ return ZC_GROW_UP;
+ }
+ }
+
+ if (diff + size >= idx->off + idx->size) {
+ *grow = diff + size - idx->off - idx->size;
+ return ZC_GROW_BOTH;
+ } else {
+ *grow = idx->off - diff;
+ return ZC_GROW_DOWN;
+ }
+
+ BUG();
+
+ return ZC_NEXT;
+}
+
+static int zc_check_seq(struct zsock *zsk, struct zc_page *zp, u16 diff, u16 size, u16 *grow,
struct zc_index **idx_grow)
+{
+ struct zc_index *idx;
+ int ret = ZC_NEXT;
+
+ if (likely(zp->idx_num <= ZC_MAX_IDX)) {
+ int i;
+
+ for (i=0; i<zp->idx_num; ++i) {
+ idx = &zp->idx[i];
+
+ ret = zc_check_seq_index(idx, diff, size, grow);
+ if (ret != ZC_NEXT) {
+ *idx_grow = idx;
+ return ret;
+ }
+ }
+ } else {
+ struct zc_index_list_entry *e;
+
+ list_for_each_entry(e, &zp->idx_list, entry) {
+ idx = &e->idx;
+
+ ret = zc_check_seq_index(idx, diff, size, grow);
+ if (ret != ZC_NEXT) {
+ *idx_grow = idx;
+ return ret;
+ }
+ }
+ }
+
+ *idx_grow = NULL;
+ *grow = size;
+
+ return ZC_NEXT;
+}
+
+static int zc_commit_seq(struct zsock *zsk, struct zc_page *zp, u16 diff, u16 size, u16 grow, int
status, struct zc_index *idx_grow)
+{
+ switch (status) {
+ case ZC_OK:
+ return 0;
+ case ZC_NEXT:
+ {
+ struct zc_index *idx;
+
+ if (likely(zp->idx_num + 1 <= ZC_MAX_IDX)) {
+ idx = &zp->idx[zp->idx_num];
+ } else {
+ struct zc_index_list_entry *e = mempool_alloc(idx_pool, GFP_ATOMIC);
+ if (!e)
+ return -ENOMEM;
+ list_add_tail(&e->entry, &zp->idx_list);
+ idx = &e->idx;
+ }
+
+ idx->off = diff;
+ idx->size = size;
+ zp->idx_num++;
+ return 0;
+ }
+ default:
+ if (!idx_grow)
+ return -EINVAL;
+ idx_grow->off = diff;
+ idx_grow->size = size;
+ return 0;
+ }
+
+ return 0;
+}
+
+int zc_sock_alloc_data(struct zc_buf *zb)
+{
+ struct zsock *zsk = zb->priv;
+ struct zc_page *zp;
+ int err = 0;
+ unsigned int towrite = zb->size, skb_len;
+ struct sk_buff *skb = zb->skb;
+ struct ethhdr *eth;
+ struct iphdr *ip;
+ int index, nocopy, state, need_wakeup = 0;
+ u32 seq, ack=0, hsize, oseq;
+ u16 sz, diff, grow;
+ struct zc_index *idx;
+
+ if (!zsk->zc_pages)
+ goto out;
+
+ eth = (struct ethhdr *)zb->header;
+ ip = (struct iphdr *)(eth+1);
+
+ if (ip->protocol == IPPROTO_TCP) {
+ struct tcphdr *th = (struct tcphdr *)(((u8 *)ip) + ip->ihl*4);
+
+ hsize = sizeof(struct tcphdr);
+ oseq = seq = ntohl(th->seq);
+ ack = ntohl(th->ack_seq);
+
+ if (!towrite)
+ seq = oseq = seq+1;
+
+ /*
+ * Is it possible to come here using two different pathes?
+ * This means that skb_alloc_zerocopy() is called from different IRQ handlers
+ * on different CPUs simultaneously for the same zero-copy socket.
+ *
+ * If so, then sequence number setup must be done under write lock being held
+ * using sock_zc_setup_seq() from internals of TCP state machine.
+ */
+ if (!zsk->zc_seq_first)
+ __sock_zc_setup_seq(zsk, seq);
+ } else if (ip->protocol == IPPROTO_UDP) {
+ hsize = 0;
+ oseq = seq = zsk->zc_page_num * zsk->zc_page_index + zsk->zc_seq_first;
+ } else
+ goto out;
+
+ skb_len = skb->len;
+
+ while (towrite > 0) {
+ nocopy = 0;
+ grow = 0;
+
+ index = zc_calc_index(zsk, seq);
+ if (index < 0) {
+ err = -1;
+ break;
+ }
+
+ zp = &zsk->zc_pages[index];
+
+ BUG_ON(seq < zp->seq);
+ BUG_ON(seq >= zp->seq+zp->size);
+
+ diff = seq - zp->seq;
+
+ sz = min(zp->size - zp->used, towrite);
+ sz = min(zp->size - (zp->page_offset + diff), (unsigned int)sz);
+
+ spin_lock(&zp->lock);
+
+ state = zc_check_seq(zsk, zp, diff, sz, &grow, &idx);
+ if (state == ZC_OK)
+ nocopy = 1;
+
+ if (test_bit(ZC_PAGE_READY, &zp->flags) || (zp->size == zp->used))
+ nocopy = 1;
+ if (zp->size - zp->used < towrite && !zb->move_data) {
+ err = -1;
+ goto unlock;
+ }
+ if (unlikely(skb_shinfo(skb)->nr_frags >= MAX_SKB_FRAGS)) {
+ err = -ENOMEM;
+ goto unlock;
+ }
+
+ /*
+ * Setup fragment with offset to point to the area where
+ * we actually can write without overwriting old data.
+ * Setup fragment size to be equal not to the real data size,
+ * but size of the area where we actually can write data into.
+ */
+ skb_fill_page_desc(skb, skb_shinfo(skb)->nr_frags, zp->page, zp->page_offset+diff, sz);
+
+ if (zb->move_data) {
+ if (nocopy)
+ err = sz;
+ else
+ err = zb->move_data(zb, skb->len - skb_len, sz);
+
+ if (err <= 0)
+ goto unlock;
+ } else
+ err = zb->size;
+
+ if (zc_commit_seq(zsk, zp, diff, sz, grow, state, idx)) {
+ err = -1;
+ goto unlock;
+ }
+
+ skb->len += err;
+ skb->data_len += err;
+ skb->truesize += err;
+
+ towrite -= err;
+
+ zp->used += (state == ZC_OK)?err:grow;
+ seq += err;
+
+ BUG_ON(zp->used > zp->size);
+
+ err = 0;
+
+unlock:
+ spin_unlock(&zp->lock);
+ if (err < 0)
+ break;
+ }
+
+ seq = oseq;
+ /*
+ * Error happens when part or the whole packet can not be moved into some page.
+ * It is most likely due to the fact, that sendfile() still has not committed
+ * selected pages back to VFS.
+ * Or sequence number is completely bogus.
+ *
+ * In case of uncommitted page, we very likely caught following problem:
+ * part of the packet has been written into the previous page, but next page
+ * contains old data which is not committed to VFS, and we can not overwrite them.
+ * In this case we must fallback all writes to the previous pages, so we start
+ * from the begining, select one by one the same pages as were selected for writing,
+ * and decreases it's zp->used counter, so page starts looking like it was before.
+ */
+ if (err < 0) {
+ towrite = zb->size - towrite;
+
+ while (towrite) {
+ index = zc_calc_index(zsk, oseq);
+ if (index < 0) {
+ err = -1;
+ break;
+ }
+ zp = &zsk->zc_pages[index];
+
+ BUG_ON(seq < zp->seq);
+ BUG_ON(seq >= zp->seq+zp->size);
+
+ spin_lock(&zp->lock);
+ diff = oseq - zp->seq;
+ sz = min(zp->size - (zp->page_offset + diff), towrite);
+ zp->used -= sz;
+ spin_unlock(&zp->lock);
+
+ towrite -= sz;
+ oseq += sz;
+ }
+
+ need_wakeup = 1;
+ }
+
+ for (index=0; index<zsk->zc_page_num; ++index) {
+ zp = &zsk->zc_pages[index];
+
+ if (zp->used == zp->size) {
+ need_wakeup = 1;
+ set_bit(ZC_PAGE_READY, &zp->flags);
+ if (++zsk->zc_page_index == zsk->zc_page_num)
+ zsk->zc_page_index = 0;
+ }
+ }
+
+ if (need_wakeup)
+ set_bit(ZSK_DATA_READY, &zsk->zc_flags);
+
+out:
+ return err;
+}
+
+int zc_sock_commit_data(struct zc_buf *zb)
+{
+ struct zsock *zsk = zb->priv;
+ struct zc_page *zp;
+
+ if (!zsk->zc_pages)
+ return -1;
+
+ zp = &zsk->zc_pages[zsk->zc_page_index];
+
+ if (unlikely(zb->size != zp->size))
+ return 1;
+
+ if (zp->used == zp->size) {
+ set_bit(ZC_PAGE_READY, &zp->flags);
+ if (++zsk->zc_page_index == zsk->zc_page_num)
+ zsk->zc_page_index = 0;
+ }
+
+ set_bit(ZSK_DATA_READY, &zsk->zc_flags);
+
+ return 0;
+}
+
+/*
+ * This should process all socket's related stuff,
+ * for example emit TCP ACKs...
+ * Since zero-copy skb can only have valid header,
+ * this should process that header at skb->data.
+ * skb_copy_datagram_iovec() is changed to not even touch
+ * zero-copied skb.
+ */
+static u8 message_buf[PAGE_SIZE];
+
+static int receive_message(struct socket *sock, unsigned int ack_size)
+{
+ struct msghdr msg;
+ struct kvec iov;
+ int err;
+
+ sock->sk->sk_allocation |= GFP_NOIO;
+ iov.iov_base = message_buf;
+ iov.iov_len = min(ack_size, (unsigned int)sizeof(message_buf));
+ msg.msg_name = NULL;
+ msg.msg_namelen = 0;
+ msg.msg_control = NULL;
+ msg.msg_controllen = 0;
+ msg.msg_namelen = 0;
+ msg.msg_flags = MSG_DONTWAIT;
+
+ err = kernel_recvmsg(sock, &msg, &iov, 1, iov.iov_len, msg.msg_flags);
+
+ return err;
+}
+
+int tcp_udp_v4_sock_zc_init(struct socket *sock, struct tcp_udp_v4_priv *priv)
+{
+ struct file *file;
+ struct zsock *zsk;
+ struct zc_page *zc_pages, *zp;
+ int pnum_max, err, i;
+ unsigned long flags;
+ struct address_space *mapping;
+ struct inode *inode;
+ size_t count;
+
+ /*
+ * Sane setup.
+ */
+ count = INT_MAX;
+ pnum_max = priv->pnum;
+
+ if (!sock->sk)
+ return -EINVAL;
+
+ err = -EBADF;
+ file = fget(priv->fd);
+ if (!file)
+ goto err_out_exit;
+ if (!(file->f_mode & FMODE_WRITE))
+ goto err_out_fput;
+ err = -ETXTBSY;
+ if (file->f_mode & FMODE_ZEROCOPY)
+ goto err_out_fput;
+ err = -EINVAL;
+ if (!file->f_op)
+ goto err_out_fput;
+
+ err = rw_verify_area(WRITE, file, &file->f_pos, count);
+ if (err)
+ goto err_out_fput;
+
+ err = security_file_permission(file, MAY_WRITE);
+ if (err)
+ goto err_out_fput;
+
+ err = -ENOMEM;
+ zsk = zsk_alloc(&tcp_udp_v4_zc_handler, priv, sizeof(*priv), NULL, GFP_KERNEL);
+ if (!zsk)
+ goto err_out_fput;
+
+ mapping = file->f_mapping;
+ inode = mapping->host;
+
+ zc_pages = kzalloc(sizeof(struct zc_page) * pnum_max, GFP_KERNEL);
+ if (!zc_pages) {
+ err = -ENOMEM;
+ goto err_out_zsk_put;
+ }
+
+ pagevec_init(&zsk->zc_lru_pvec, 0);
+ file->f_pos = 0;
+
+ err = 0;
+ for (i=0; i<pnum_max; ++i) {
+ zp = &zc_pages[i];
+
+ spin_lock_init(&zp->lock);
+ err = prepare_page(zp, zsk, file, mapping, &zsk->zc_pos, count, &zsk->zc_lru_pvec);
+ if (unlikely(err))
+ goto err_out_commit_pages;
+ }
+
+ file->f_mode |= FMODE_ZEROCOPY;
+
+ write_lock_irqsave(&zsk->zc_lock, flags);
+ zsk->zc_file = file;
+ zsk->zc_pages = zc_pages;
+ zsk->zc_page_num = pnum_max;
+ zsk->zc_page_index = 0;
+ zsk->zc_alloc_data = &zc_sock_alloc_data;
+ zsk->zc_commit_data = &zc_sock_commit_data;
+ zsk->sk = sock->sk;
+ write_unlock_irqrestore(&zsk->zc_lock, flags);
+
+ err = tcp_udp_v4_zc_sock_insert(zsk);
+ if (err) {
+ i = pnum_max;
+ goto err_out_commit_pages;
+ }
+
+ sock->sk->zsk = zsk;
+
+ return 0;
+
+ write_lock_irqsave(&zsk->zc_lock, flags);
+ zsk->zc_file = NULL;
+ zsk->zc_pages = 0;
+ zsk->zc_page_num = 0;
+ zsk->zc_page_index = 0;
+ zsk->zc_alloc_data = NULL;
+ zsk->zc_commit_data = NULL;
+ zsk->sk = NULL;
+ write_unlock_irqrestore(&zsk->zc_lock, flags);
+
+err_out_commit_pages:
+ for (--i; i>=0; --i)
+ commit_page(&zc_pages[i], file, mapping);
+
+ kfree(zc_pages);
+err_out_zsk_put:
+ zsk_put(zsk);
+err_out_fput:
+ file->f_mode &= ~FMODE_ZEROCOPY;
+ fput(file);
+err_out_exit:
+ return err;
+}
+
+static ssize_t sock_sendfile(struct file *in_file, loff_t *ppos, size_t count, read_actor_t actor,
void *target)
+{
+ struct socket *sock;
+ struct sock *sk;
+ int err = 0;
+ size_t written = 0;
+ struct file *file = target;
+ struct address_space *mapping = file->f_mapping;
+ struct inode *inode = mapping->host;
+ int i;
+ unsigned int ack_size;
+ struct zsock *zsk;
+
+ if (!count)
+ return 0;
+
+ sock = SOCKET_I(in_file->f_dentry->d_inode);
+
+ if (!sock || !sock->sk || !sock->sk->zsk)
+ return -ENODEV;
+ sk = sock->sk;
+ zsk = sk->zsk;
+ sk->zsk = NULL;
+
+ err = generic_write_checks(file, &zsk->zc_pos, &count, S_ISBLK(inode->i_mode));
+ if (err)
+ goto err_out_exit;
+
+ if (!zsk->zc_pages) {
+ err = -EINVAL;
+ goto err_out_exit;
+ }
+
+ zsk_get(zsk);
+
+ while (count) {
+ struct zc_page *zp;
+
+ wait_event_interruptible_timeout(zsk->zc_data_ready, test_and_clear_bit(ZSK_DATA_READY,
&zsk->zc_flags), 5*HZ);
+
+ for (i=0; i<zsk->zc_page_num; ++i) {
+ zp = &zsk->zc_pages[i];
+
+ if (test_bit(ZC_PAGE_READY, &zp->flags)) {
+ err = commit_page(zp, file, mapping);
+ if (err)
+ goto err_out_release_all_pages;
+
+ count -= zp->used;
+ written += zp->used;
+
+ err = prepare_page(zp, zsk, file, mapping, &zsk->zc_pos, count, &zsk->zc_lru_pvec);
+ }
+ }
+
+ if (signal_pending(current))
+ break;
+
+ ack_size = zsk->zc_page_num * sizeof(message_buf);
+ while ((err = receive_message(sock, ack_size)) > 0) {
+ ack_size -= err;
+ }
+ }
+
+ *ppos = written;
+ err = written;
+
+err_out_release_all_pages:
+
+err_out_exit:
+ sk_zc_fini(zsk);
+ zc_cleanup(zsk);
+ return err;
+}
+
static ssize_t sock_sendpage(struct file *file, struct page *page,
int offset, size_t size, loff_t *ppos, int more)
{
--
Evgeniy Polyakov
#include <sys/sendfile.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <sys/stat.h>
#include <sys/time.h>
#include <fcntl.h>
#include <netdb.h>
#include <string.h>
#include <stdio.h>
#include <stdlib.h>
#include <errno.h>
#include <unistd.h>
#include <signal.h>
#include <linux/socket.h>
#include <linux/types.h>
#include <net/zerocopy.h>
#define TYPE_SENDFILE 0
#define TYPE_RW 1
#define ulog_err(f, a...) fprintf(stderr, f ": %s [%d].\n", ##a, strerror(errno), errno)
#define ulog(f, a...) fprintf(stderr, f, ##a)
static int need_exit;
void SIGINT_h(int signo)
{
need_exit = signo;
}
static inline uint32_t num2ip(uint8_t a, uint8_t b, uint8_t c, uint8_t d)
{
uint32_t ret = 0;
ret |= a;
ret <<= 8;
ret |= b;
ret <<= 8;
ret |= c;
ret <<= 8;
ret |= d;
return ret;
}
int create_socket(char *addr, unsigned short port, char *bind_addr, unsigned short bind_port, int
in, int pnum, int type)
{
int s, err;
struct hostent *h, *bh;
struct sockaddr_in sa, bsa;
ulog("%s:%u -> %s:%u.\n", bind_addr, bind_port, addr, port);
h = gethostbyname(addr);
if (!h) {
ulog_err("gethostbyname %s", addr);
return -1;
}
sa.sin_family = AF_INET;
sa.sin_port = htons(port);
memcpy(&sa.sin_addr.s_addr, h->h_addr_list[0], 4);
bh = gethostbyname(bind_addr);
if (!bh) {
ulog_err("gethostbyname %s", bind_addr);
return -1;
}
bsa.sin_family = AF_INET;
bsa.sin_port = htons(bind_port);
memcpy(&bsa.sin_addr.s_addr, bh->h_addr_list[0], 4);
s = socket(AF_INET, SOCK_STREAM, 0);
if (s == -1) {
ulog_err("Failed to create a socket");
return -1;
}
if (type == TYPE_SENDFILE) {
unsigned char buf[sizeof(struct sock_zc_setup_data) + sizeof(struct tcp_udp_v4_priv)];
struct sock_zc_setup_data *setup_data;
struct tcp_udp_v4_priv *priv;
setup_data = (struct sock_zc_setup_data *)buf;
priv = (struct tcp_udp_v4_priv *)(setup_data + 1);
setup_data->type = htonl(IPPROTO_TCP);
setup_data->size = htonl(sizeof(struct tcp_udp_v4_priv));
priv->dst = sa.sin_addr.s_addr;
priv->dport = htons(port);
priv->src = bsa.sin_addr.s_addr;
priv->sport = htons(bind_port);
priv->fd = in;
priv->pnum = pnum;
ulog("%08x -> %08x.\n", priv->src, priv->dst);
err = setsockopt(s, SOL_SOCKET, SO_ZEROCOPY, setup_data, sizeof(struct sock_zc_setup_data) +
sizeof(struct tcp_udp_v4_priv));
if (err) {
ulog_err("Failed to setup zero-copy socket");
return err;
}
}
if (bind(s, (struct sockaddr *)&bsa, sizeof(bsa)) == -1) {
ulog_err("bind");
close(s);
return -1;
}
if (connect(s, (struct sockaddr *)&sa, sizeof(sa)) == -1) {
ulog_err("connect");
close(s);
return -1;
}
return s;
}
static ssize_t test_write(int in, int s, int count)
{
char *buf, *ptr;
int err, err1;
int sz = 4096;
ssize_t bytes = 0;
buf = malloc(sz);
if (!buf) {
ulog("Failed to allocate buffer of %d bytes.\n", count);
return -ENOMEM;
}
while (!need_exit) {
err = recv(s, buf, sz, 0);
if (err <= 0) {
ulog_err("recv");
break;
}
count -= err;
bytes += err;
ptr = buf;
while (err) {
err1 = write(in, ptr, err);
if (err1 <= 0)
break;
err -= err1;
ptr += err1;
}
}
free(buf);
return bytes;
}
static ssize_t test_sendfile(int in, int s, int count)
{
int err;
ssize_t bytes = 0;
while (!need_exit) {
err = sendfile(in, s, NULL, count);
if (err <= 0) {
ulog_err("sendfile");
break;
}
bytes += err;
}
return bytes;
}
static void usage(const char *p)
{
ulog("Usage: %s\n", p);
}
int main(int argc, char *argv[])
{
int s, in, ch;
size_t count = 1024*1024*1000;
int type, pnum;
ssize_t bytes;
char *addr, *bind_addr, *file;
unsigned short port, bind_port;
long utime;
double speed;
struct timeval tm1, tm2;
addr = bind_addr = file = NULL;
port = bind_port = 0;
type = -1;
pnum = 32;
while ((ch = getopt(argc, argv, "a:p:f:t:b:B:n:h")) != -1) {
switch (ch) {
case 'a':
addr = optarg;
break;
case 'p':
port = atoi(optarg);
break;
case 'b':
bind_addr = optarg;
break;
case 'B':
bind_port = atoi(optarg);
break;
case 'f':
file = optarg;
break;
case 't':
type = atoi(optarg);
break;
case 'n':
pnum = atoi(optarg);
break;
case 'h':
default:
usage(argv[0]);
return -1;
}
}
if (!file || !addr || !bind_addr || !port || !bind_port || type == -1) {
usage(argv[0]);
return -1;
}
signal(SIGINT, SIGINT_h);
in = open(file, O_RDWR | O_TRUNC | O_CREAT, 0644);
if (in == -1) {
ulog_err("open");
return -1;
}
s = create_socket(addr, port, bind_addr, bind_port, in, pnum, type);
if (s < 0)
return s;
gettimeofday(&tm1, NULL);
switch (type) {
case TYPE_SENDFILE:
bytes = test_sendfile(in, s, count);
break;
default:
case TYPE_RW:
bytes = test_write(in, s, count);
break;
}
gettimeofday(&tm2, NULL);
utime = (tm2.tv_sec - tm1.tv_sec) * 1000000 + tm2.tv_usec - tm1.tv_usec;
speed = (double)((double)bytes * 1000000) / (double)((double)utime * 1024 * 1024);
ulog("transferred:%zd bytes, speed:%f Mb/sec, time:%ld usec.\n",
bytes, speed, utime);
close(s);
close(in);
return 0;
}