Re: using splice/vmsplice to improve file receive performance

From: saeed bishara
Date: Fri Dec 22 2006 - 06:18:44 EST


On 12/22/06, Jens Axboe <jens.axboe@xxxxxxxxxx> wrote:
On Thu, Dec 21 2006, saeed bishara wrote:
> Hi,
> I'm trying to use the splice/vmsplice system calls to improve the
> samba server write throughput, but before touching the smbd, I started
> to improve the ttcp tool since it simple and has the same flow. I'm
> expecting to avoid the "copy_from_user" path when using those
> syscalls.
> so far, I couldn't make any improvement, actually the throughput get
> worst. the new receive flow looks like this (code also attached):
> 1. read tcp packet (64 pages) to page aligned buffer.
> 2. vmsplice the buffer to pipe with SPLICE_F_MOVE.
> 3. splice the pipe to the file, also with SPLICE_F_MOVE.
>
> the strace shows that the splice takes a lot of time. also when
> profiling the kernel, I found that the memcpy() called to often !!

(didn't see this until now, axboe@xxxxxxx doesn't work anymore)

I'm assuming that you mean you vmsplice with SPLICE_F_GIFT, to hand
ownership of the pages to the kernel (in which case SPLICE_F_MOVE will
work, otherwise you get a copy)? If not, that'll surely cost you a data
copy
I'll try the vmplice with SPLICE_F_GIFT and splice with MOVE. btw,
I noticed that the splice system call takes the bulk of the time,
does it mean anything?

This sounds remarkably like a recent thread on lkml, you may want to
read up on that. Basically using splice for network receive is a bit of
a work-around now, since you do need the one copy and then vmsplice that
into a pipe. To realize the full potential of splice, we first need
socket receive support so you can skip that step (splice from socket to
pipe, splice pipe to file).
Ashwini Kulkarni posted patches that implements that, see
http://lkml.org/lkml/2006/9/20/272 . is that right?

There was no test code attached, btw.
sorry, here it is.
can you please add sample application to your test tools (splice,fio
,,) that demonstrates my flow; socket to file using read & vmsplice?

--
Jens Axboe


/*
* T T C P . C
*
* Test TCP connection. Makes a connection on port 2000
* and transfers zero buffers or data copied from stdin.
*
* Usable on 4.2, 4.3, and 4.1a systems by defining one of
* BSD42 BSD43 (BSD41a)
*
* Modified for operation under 4.2BSD, 18 Dec 84
* T.C. Slattery, USNA
* Minor improvements, Mike Muuss and Terry Slattery, 16-Oct-85.
*
* Mike Muuss and Terry Slattery have released this code to the Public Domain.
*/


#define BSD43
/* #define BSD42 */
/* #define BSD41a */

#include <stdio.h>
#include <stdlib.h>
#include <strings.h>
#include <string.h>
#include <unistd.h>
#include <ctype.h>
#include <errno.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <netdb.h>
#include <sys/time.h> /* struct timeval */

#ifdef SYSV
#include <sys/times.h>
#include <sys/param.h>
#else
#include <sys/resource.h>
#endif

#include "splice.h"

#ifndef MAX_SPLICE_SIZE
#define MAX_SPLICE_SIZE (64 * 1024)
#endif

#define VM_BUFFERS 2
#define ALIGN(value,size) (((value) + (size) - 1) & ~((size) - 1))

struct sockaddr_in sinme;
struct sockaddr_in sinhim;
struct sockaddr_in sindum;
struct sockaddr_in frominet;

int domain, fromlen;
int fd; /* fd of network socket */

int pfds/*[VM_BUFFERS]*/[2]; /* fds for pipe */
int buflen = 1024; /* length of buffer */
char *buf; /* ptr to dynamic buffer */
int nbuf = 1024; /* number of buffers to send in sinkmode */

size_t pipe_buf_size;
char *pipe_buf;

int udp = 0; /* 0 = tcp, !0 = udp */
int options = 0; /* socket options */
int one = 1; /* for 4.3 BSD style setsockopt() */
short port = 2000; /* TCP port number */
char *host; /* ptr to name of host */
int trans; /* 0=receive, !0=transmit mode */
int sinkmode; /* 0=normal I/O, !0=sink/source mode */
int splicemode; /* 0=normal I/O, !0=splice mode */

struct hostent *addr;
extern int errno;

char Usage[] = "\
Usage: ttcp -t [-options] host <in\n\
-l## length of bufs written to network (default 1024)\n\
-s source a pattern to network\n\
-n## number of bufs written to network (-s only, default 1024)\n\
-p## port number to send to (default 2000)\n\
-u use UDP instead of TCP\n\
Usage: ttcp -r [-options] >out\n\
-l## length of network read buf (default 1024)\n\
-s sink (discard) all data from network\n\
-z use the splice syscall\n\
-p## port number to listen at (default 2000)\n\
-B Only output full blocks, as specified in -l## (for TAR)\n\
-u use UDP instead of TCP\n\
";

char stats[128];
double t; /* transmission time */
long nbytes; /* bytes on net */
int b_flag = 0; /* use mread() */

void prep_timer();
double read_timer();
double cput, realt; /* user, real time (seconds) */
void err(char *s);
void mes(char *s);
void pattern(register char *cp,
register int cnt);
int Nread( int fd, char *buf, int count );
int Nwrite( int fd, char *buf, int count );
int mread(int fd, char *bufp, unsigned int n);
void setup_vmsplice();
uint64_t vmsplice_recv(int sd, int fd);
//int delay(int us);

int
main(argc,argv)
int argc;
char **argv;
{
unsigned long addr_tmp;

if (argc < 2) goto usage;

argv++; argc--;
while( argc>0 && argv[0][0] == '-' ) {
switch (argv[0][1]) {

case 'B':
b_flag = 1;
break;
case 't':
trans = 1;
break;
case 'r':
trans = 0;
break;
case 'd':
options |= SO_DEBUG;
break;
case 'n':
nbuf = atoi(&argv[0][2]);
break;
case 'l':
buflen = atoi(&argv[0][2]);
break;
case 's':
sinkmode = 1; /* source or sink, really */
break;
case 'z':
splicemode = 1; /* splice mode */
break;
case 'p':
port = atoi(&argv[0][2]);
break;
case 'u':
udp = 1;
break;
default:
goto usage;
}
argv++; argc--;
}
if(trans) {
/* xmitr */
if (argc != 1) goto usage;
bzero((char *)&sinhim, sizeof(sinhim));
host = argv[0];
if (atoi(host) > 0 ) {
/* Numeric */
sinhim.sin_family = AF_INET;
#ifdef cray
addr_tmp = inet_addr(host);
sinhim.sin_addr = addr_tmp;
#else
sinhim.sin_addr.s_addr = inet_addr(host);
#endif
} else {
if ((addr=gethostbyname(host)) == NULL)
err("bad hostname");
sinhim.sin_family = addr->h_addrtype;
bcopy(addr->h_addr,(char*)&addr_tmp, addr->h_length);
#ifdef cray
sinhim.sin_addr = addr_tmp;
#else
sinhim.sin_addr.s_addr = addr_tmp;
#endif
}
sinhim.sin_port = htons(port);
sinme.sin_port = 0; /* free choice */
} else {
/* rcvr */
sinme.sin_port = htons(port);
}

if( (buf = (char *)malloc(buflen)) == (char *)NULL)
err("malloc");
fprintf(stderr,"ttcp%s: nbuf=%d, buflen=%d, port=%d\n",
trans?"-t":"-r",
nbuf, buflen, port);

if ((fd = socket(AF_INET, udp?SOCK_DGRAM:SOCK_STREAM, 0)) < 0)
err("socket");
mes("socket");

if (bind(fd, &sinme, sizeof(sinme)) < 0)
err("bind");

if (!udp) {
if (trans) {
/* We are the client if transmitting */
if(options) {
#ifdef BSD42
if( setsockopt(fd, SOL_SOCKET, options, 0, 0) < 0)
#else
if( setsockopt(fd, SOL_SOCKET, options, &one, sizeof(one)) < 0)
#endif
err("setsockopt");
}
if(connect(fd, &sinhim, sizeof(sinhim) ) < 0)
err("connect");
mes("connect");
} else {
/* otherwise, we are the server and
* should listen for the connections
*/
listen(fd,0); /* allow a queue of 0 */
if(options) {
#ifdef BSD42
if( setsockopt(fd, SOL_SOCKET, options, 0, 0) < 0)
#else
if( setsockopt(fd, SOL_SOCKET, options, &one, sizeof(one)) < 0)
#endif
err("setsockopt");
}
fromlen = sizeof(frominet);
domain = AF_INET;
if((fd=accept(fd, &frominet, &fromlen) ) < 0)
err("accept");
mes("accept");
}
}

if (splicemode){
pipe_buf_size = buflen;
setup_vmsplice();
}
prep_timer();
errno = 0;
if (sinkmode) {
register int cnt;
if (trans) {
pattern( buf, buflen );
if(udp) (void)Nwrite( fd, buf, 4 ); /* rcvr start */
while (nbuf-- && Nwrite(fd,buf,buflen) == buflen)
nbytes += buflen;
if(udp) (void)Nwrite( fd, buf, 4 ); /* rcvr end */
} else {
while ((cnt=Nread(fd,buf,buflen)) > 0) {
static int going = 0;
if( cnt <= 4 ) {
if( going )
break; /* "EOF" */
going = 1;
prep_timer();
} else
nbytes += cnt;
}
}
} else {
register int cnt;
if (trans) {
while((cnt=read(0,buf,buflen)) > 0 &&
Nwrite(fd,buf,cnt) == cnt)
nbytes += cnt;
} else {
if(splicemode)
{
while((cnt=vmsplice_recv(fd, 1)) > 0)
nbytes += cnt;
}
else
{
while((cnt=Nread(fd,buf,buflen)) > 0 &&
write(1,buf,cnt) == cnt)
nbytes += cnt;
}
}
}
if(errno) err("IO");
(void)read_timer(stats,sizeof(stats));
if(udp&&trans) {
(void)Nwrite( fd, buf, 4 ); /* rcvr end */
(void)Nwrite( fd, buf, 4 ); /* rcvr end */
(void)Nwrite( fd, buf, 4 ); /* rcvr end */
(void)Nwrite( fd, buf, 4 ); /* rcvr end */
}
fprintf(stderr,"ttcp%s: %s\n", trans?"-t":"-r", stats);
if( cput <= 0.0 ) cput = 0.001;
if( realt <= 0.0 ) realt = 0.001;
fprintf(stderr,"ttcp%s: %ld bytes processed\n",
trans?"-t":"-r", nbytes );
fprintf(stderr,"ttcp%s: %9g CPU sec = %9g KB/cpu sec, %9g Kbits/cpu sec\n",
trans?"-t":"-r",
cput,
((double)nbytes)/cput/1024,
((double)nbytes)*8/cput/1024 );
fprintf(stderr,"ttcp%s: %9g real sec = %9g KB/real sec, %9g Kbits/sec\n",
trans?"-t":"-r",
realt,
((double)nbytes)/realt/1024,
((double)nbytes)*8/realt/1024 );
exit(0);

usage:
fprintf(stderr,Usage);
exit(1);
}

void
err(s)
char *s;
{
fprintf(stderr,"ttcp%s: ", trans?"-t":"-r");
perror(s);
fprintf(stderr,"errno=%d\n",errno);
exit(1);
}

void
mes(s)
char *s;
{
fprintf(stderr,"ttcp%s: %s\n", trans?"-t":"-r", s);
}

void
pattern( cp, cnt )
register char *cp;
register int cnt;
{
register char c;
c = 0;
while( cnt-- > 0 ) {
while( !isprint((c&0x7F)) ) c++;
*cp++ = (c++&0x7F);
}
}

/******* timing *********/

#ifdef SYSV
extern long time();
static long time0;
static struct tms tms0;
#else
static struct timeval time0; /* Time at which timeing started */
static struct rusage ru0; /* Resource utilization at the start */

static void prusage();
static void tvadd();
static void tvsub();
static void psecs();
#endif

/*
* P R E P _ T I M E R
*/
void
prep_timer()
{
#ifdef SYSV
(void)time(&time0);
(void)times(&tms0);
#else
gettimeofday(&time0, (struct timezone *)0);
getrusage(RUSAGE_SELF, &ru0);
#endif
}

/*
* R E A D _ T I M E R
*
*/
double
read_timer(str,len)
char *str;
{
#ifdef SYSV
long now;
struct tms tmsnow;
char line[132];

(void)time(&now);
realt = now-time0;
(void)times(&tmsnow);
cput = tmsnow.tms_utime - tms0.tms_utime;
cput /= HZ;
if( cput < 0.00001 ) cput = 0.01;
if( realt < 0.00001 ) realt = cput;
sprintf(line,"%g CPU secs in %g elapsed secs (%g%%)",
cput, realt,
cput/realt*100 );
(void)strncpy( str, line, len );
return( cput );
#else
/* BSD */
struct timeval timedol;
struct rusage ru1;
struct timeval td;
struct timeval tend, tstart;
char line[132];

getrusage(RUSAGE_SELF, &ru1);
gettimeofday(&timedol, (struct timezone *)0);
prusage(&ru0, &ru1, &timedol, &time0, line);
(void)strncpy( str, line, len );

/* Get real time */
tvsub( &td, &timedol, &time0 );
realt = td.tv_sec + ((double)td.tv_usec) / 1000000;

/* Get CPU time (user+sys) */
tvadd( &tend, &ru1.ru_utime, &ru1.ru_stime );
tvadd( &tstart, &ru0.ru_utime, &ru0.ru_stime );
tvsub( &td, &tend, &tstart );
cput = td.tv_sec + ((double)td.tv_usec) / 1000000;
if( cput < 0.00001 ) cput = 0.00001;
return( cput );
#endif
}

#ifndef SYSV
static void
prusage(r0, r1, e, b, outp)
register struct rusage *r0, *r1;
struct timeval *e, *b;
char *outp;
{
struct timeval tdiff;
register time_t t;
register char *cp;
register int i;
int ms;

t = (r1->ru_utime.tv_sec-r0->ru_utime.tv_sec)*100+
(r1->ru_utime.tv_usec-r0->ru_utime.tv_usec)/10000+
(r1->ru_stime.tv_sec-r0->ru_stime.tv_sec)*100+
(r1->ru_stime.tv_usec-r0->ru_stime.tv_usec)/10000;
ms = (e->tv_sec-b->tv_sec)*100 + (e->tv_usec-b->tv_usec)/10000;

#define END(x) {while(*x) x++;}
cp = "%Uuser %Ssys %Ereal %P %Xi+%Dd %Mmaxrss %F+%Rpf %Ccsw";
for (; *cp; cp++) {
if (*cp != '%')
*outp++ = *cp;
else if (cp[1]) switch(*++cp) {

case 'U':
tvsub(&tdiff, &r1->ru_utime, &r0->ru_utime);
sprintf(outp,"%d.%01ld",(int) tdiff.tv_sec, tdiff.tv_usec/100000);
END(outp);
break;

case 'S':
tvsub(&tdiff, &r1->ru_stime, &r0->ru_stime);
sprintf(outp,"%d.%01ld", (int )tdiff.tv_sec, tdiff.tv_usec/100000);
END(outp);
break;

case 'E':
psecs(ms / 100, outp);
END(outp);
break;

case 'P':
sprintf(outp,"%d%%", (int) (t*100 / ((ms ? ms : 1))));
END(outp);
break;

case 'W':
i = r1->ru_nswap - r0->ru_nswap;
sprintf(outp,"%d", i);
END(outp);
break;

case 'X':
sprintf(outp,"%ld", t == 0 ? 0 : (r1->ru_ixrss-r0->ru_ixrss)/t);
END(outp);
break;

case 'D':
sprintf(outp,"%ld", t == 0 ? 0 :
(r1->ru_idrss+r1->ru_isrss-(r0->ru_idrss+r0->ru_isrss))/t);
END(outp);
break;

case 'K':
sprintf(outp,"%ld", t == 0 ? 0 :
((r1->ru_ixrss+r1->ru_isrss+r1->ru_idrss) -
(r0->ru_ixrss+r0->ru_idrss+r0->ru_isrss))/t);
END(outp);
break;

case 'M':
sprintf(outp,"%ld", r1->ru_maxrss/2);
END(outp);
break;

case 'F':
sprintf(outp,"%ld", r1->ru_majflt-r0->ru_majflt);
END(outp);
break;

case 'R':
sprintf(outp,"%ld", r1->ru_minflt-r0->ru_minflt);
END(outp);
break;

case 'I':
sprintf(outp,"%ld", r1->ru_inblock-r0->ru_inblock);
END(outp);
break;

case 'O':
sprintf(outp,"%ld", r1->ru_oublock-r0->ru_oublock);
END(outp);
break;
case 'C':
sprintf(outp,"%ld+%ld", r1->ru_nvcsw-r0->ru_nvcsw,
r1->ru_nivcsw-r0->ru_nivcsw );
END(outp);
break;
}
}
*outp = '\0';
}

static void
tvadd(tsum, t0, t1)
struct timeval *tsum, *t0, *t1;
{

tsum->tv_sec = t0->tv_sec + t1->tv_sec;
tsum->tv_usec = t0->tv_usec + t1->tv_usec;
if (tsum->tv_usec > 1000000)
tsum->tv_sec++, tsum->tv_usec -= 1000000;
}

static void
tvsub(tdiff, t1, t0)
struct timeval *tdiff, *t1, *t0;
{

tdiff->tv_sec = t1->tv_sec - t0->tv_sec;
tdiff->tv_usec = t1->tv_usec - t0->tv_usec;
if (tdiff->tv_usec < 0)
tdiff->tv_sec--, tdiff->tv_usec += 1000000;
}

static void
psecs(l,cp)
long l;
register char *cp;
{
register int i;

i = l / 3600;
if (i) {
sprintf(cp,"%d:", i);
END(cp);
i = l % 3600;
sprintf(cp,"%d%d", (i/60) / 10, (i/60) % 10);
END(cp);
} else {
i = l;
sprintf(cp,"%d", i / 60);
END(cp);
}
i %= 60;
*cp++ = ':';
sprintf(cp,"%d%d", i / 10, i % 10);
}
#endif

/*
* N R E A D
*/
int Nread( int fd, char *buf, int count )
{
struct sockaddr_in from;
int len = sizeof(from);
register int cnt;
if( udp ) {
cnt = recvfrom( fd, buf, count, 0, &from, &len );
} else {
if( b_flag )
cnt = mread( fd, buf, count ); /* fill buf */
else
cnt = read( fd, buf, count );
}
return(cnt);
}

/*
* N W R I T E
*/
int Nwrite( int fd, char *buf, int count )
{
register int cnt;
if( udp ) {
again:
cnt = sendto( fd, buf, count, 0, &sinhim, sizeof(sinhim) );
if( cnt<0 && errno == ENOBUFS ) {
usleep(18000);
errno = 0;
goto again;
}
} else {
cnt = write( fd, buf, count );
}
return(cnt);
}
#if 0
int
delay(us)
{
struct timeval tv;

tv.tv_sec = 0;
tv.tv_usec = us;
(void)select( 1, (char *)0, (char *)0, (char *)0, &tv );
return(1);
}
#endif
/*
* M R E A D
*
* This function performs the function of a read(II) but will
* call read(II) multiple times in order to get the requested
* number of characters. This can be necessary because
* network connections don't deliver data with the same
* grouping as it is written with. Written by Robert S. Miles, BRL.
*/
int
mread(fd, bufp, n)
int fd;
register char *bufp;
unsigned n;
{
register unsigned count = 0;
register int nread;

do {
nread = read(fd, bufp, n-count);
if(nread < 0) {
perror("ttcp_mread");
return(-1);
}
if(nread == 0)
return((int)count);
count += (unsigned)nread;
bufp += nread;
} while(count < n);

return((int)count);
}
/* vmsplice moves pages backing a user address range to a pipe.
However,
* you don't want the application changing data in that address range
* after the pages have been moved to the pipe, but before they have
been
* consumed at their destination.
*
* The solution is to double buffer:
* load buffer A, vmsplice to pipe
* load buffer B, vmsplice to pipe
* When the B->splice->pipe call completes, there can no longer be any
* references in the pipe to the pages backing buffer A, since it is now
* filled with references to the pages backing buffer B. So, it is safe
* to load new data into buffer A.
*/
void setup_vmsplice()
{
int i;
size_t pg_sz = getpagesize();
/*
for (i=0; i < VM_BUFFERS; i++){
if (pipe(pfds[i]) < 0) {
err("opening pipe");
}
}*/
if (pipe(pfds) < 0) {
err("opening pipe");
}

if (pipe_buf_size > MAX_SPLICE_SIZE)
pipe_buf_size = MAX_SPLICE_SIZE;
pipe_buf_size = ALIGN(pipe_buf_size, pg_sz);

pipe_buf = malloc(VM_BUFFERS*pipe_buf_size + pg_sz);
if (!pipe_buf) {
err("Allocating data buffer");
}
pipe_buf = (char *)ALIGN((unsigned long)pipe_buf,
(unsigned long)pg_sz);
fprintf(stderr, "setup_vmsplice: buf %p , buf_size %x\n",
pipe_buf, pipe_buf_size);
}
#if 1
uint64_t vmsplice_recv(int sd, int fd)
{
struct iovec iov;
uint64_t bytes = 0;
ssize_t n, m, l;
unsigned i = 1;

// fprintf(stderr, "vmsplice_recv: sd %d fd %d\n", sd, fd);
again:
i = (i + 1) % VM_BUFFERS;
iov.iov_base = pipe_buf + i * pipe_buf_size;
l = 0;

again2:
m = read(sd, iov.iov_base + l, pipe_buf_size - l);
/* fprintf(stderr, "vmsplice_recv: read 0x%x bytes, buf size 0x%x, l %d, buffer %p\n",
m, pipe_buf_size, l, iov.iov_base + l);*/

if (m < 0) {
if (errno == EINTR)
goto again2;
perror("Read");
exit(EXIT_FAILURE);
}
if (m == 0) {
if (l == 0) {
fprintf(stderr, "vmsplice_recv: l reached zero. bytes is 0x%x\n", (int) bytes);
fdatasync(fd);
return bytes;
}
}
else {
l += m;
if (l != pipe_buf_size)
goto again2;

}
while (l) {
unsigned int splice_flags = 0;

iov.iov_len = l;
if ((l & 0x3FF) == 0)
splice_flags = SPLICE_F_MOVE;

n = vmsplice(pfds[1], &iov, 1, splice_flags);
/* fprintf(stderr, "vmsplice_recv: vmsplice 0x%x bytes to pipe\n", n);*/

if (n < 0) {
perror("vmsplice to pipe");
fprintf(stderr, "vmsplice_recv: iov_len %d iov.iov_base %x\n", iov.iov_len,
iov.iov_base);
exit(EXIT_FAILURE);
}
while (n) {
m = splice(pfds[0], NULL, fd, NULL, n, splice_flags);
// fprintf(stderr, "vmsplice_recv: vmsplice 0x%x bytes to file\n", m);
if (m < 0) {
perror("splice to file");
exit(EXIT_FAILURE);
}
if (m & 0x3FF)
{
fprintf(stderr, "splice: returned %x \n", m);
}
n -= m;
l -= m;
bytes += m;
iov.iov_base += m;
}
}
goto again;
}
#endif
#ifndef SPLICE_H
#define SPLICE_H

#include <errno.h>
#include <sys/uio.h>
#include <sys/stat.h>
#include <linux/unistd.h>

#if defined(__i386__)
#define __NR_sys_splice 313
#define __NR_sys_tee 315
#define __NR_sys_vmsplice 316
#elif defined(__x86_64__)
#define __NR_sys_splice 275
#define __NR_sys_tee 276
#define __NR_sys_vmsplice 278
#elif defined(__powerpc__) || defined(__powerpc64__)
#define __NR_sys_splice 283
#define __NR_sys_tee 284
#define __NR_sys_vmsplice 285
#elif defined(__ia64__)
#define __NR_sys_splice 1297
#define __NR_sys_tee 1301
#define __NR_sys_vmsplice 1302
#elif defined(__arm__)
#define __NR_sys_splice 322
#define __NR_sys_tee 323
#define __NR_sys_vmsplice 324
#else
#error unsupported arch
#endif

#define SPLICE_F_MOVE (0x01) /* move pages instead of copying */
#define SPLICE_F_NONBLOCK (0x02) /* don't block on the pipe splicing (but */
/* we may still block on the fd we splice */
/* from/to, of course */
#define SPLICE_F_MORE (0x04) /* expect more data */
#define SPLICE_F_GIFT (0x08) /* pages passed in are a gift */

#define SYS_splice __NR_sys_splice
#define SYS_tee __NR_sys_tee
#define SYS_vmsplice __NR_sys_vmsplice

static inline
int splice(int fd_in, off64_t *off_in, int fd_out, off64_t *off_out,
size_t len, unsigned int flags)
{
return syscall(SYS_splice, fd_in, off_in,
fd_out, off_out, len, flags);
}
static inline
int tee(int fdin, int fdout, size_t len, unsigned int flags)
{
return syscall(SYS_tee, fdin, fdout, len, flags);
}

static inline
int vmsplice(int fd, const struct iovec *iov,
unsigned long nr_segs, unsigned int flags)
{
return syscall(SYS_vmsplice, fd, iov, nr_segs, flags);
}


#if 0

_syscall6(int, sys_splice, int, fdin, loff_t *, off_in, int, fdout, loff_t *, off_out, size_t, len, unsigned int, flags);
#endif
#if 0
_syscall4(int, sys_vmsplice, int, fd, const struct iovec *, iov, unsigned long, nr_segs, unsigned int, flags);
_syscall4(int, sys_tee, int, fdin, int, fdout, size_t, len, unsigned int, flags);
#endif
#if 0
static inline int splice(int fdin, loff_t *off_in, int fdout, loff_t *off_out,
size_t len, unsigned long flags)
{
return sys_splice(fdin, off_in, fdout, off_out, len, flags);
}

static inline int tee(int fdin, int fdout, size_t len, unsigned int flags)
{
return sys_tee(fdin, fdout, len, flags);
}

static inline int vmsplice(int fd, const struct iovec *iov,
unsigned long nr_segs, unsigned int flags)
{
return sys_vmsplice(fd, iov, nr_segs, flags);
}
#endif
#define SPLICE_SIZE (64*1024)

#define BUG_ON(c) assert(!(c))

#define min(x,y) ({ \
typeof(x) _x = (x); \
typeof(y) _y = (y); \
(void) (&_x == &_y); \
_x < _y ? _x : _y; })

#define max(x,y) ({ \
typeof(x) _x = (x); \
typeof(y) _y = (y); \
(void) (&_x == &_y); \
_x > _y ? _x : _y; })

static inline int error(const char *n)
{
perror(n);
return -1;
}

static int __check_pipe(int pfd)
{
struct stat sb;

if (fstat(pfd, &sb) < 0)
return error("stat");
if (!S_ISFIFO(sb.st_mode))
return 1;

return 0;
}

static inline int check_input_pipe(void)
{
if (!__check_pipe(STDIN_FILENO))
return 0;

fprintf(stderr, "stdin must be a pipe\n");
return 1;
}

static inline int check_output_pipe(void)
{
if (!__check_pipe(STDOUT_FILENO))
return 0;

fprintf(stderr, "stdout must be a pipe\n");
return 1;
}

#endif

Attachment: Makefile
Description: Binary data