You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

429 lines
10 KiB
C

#ifdef __MINGW32__
#include <windows.h>
#include <mswsock.h>
#include <errno.h>
#include "io_internal.h"
#include "iob_internal.h"
#include <stdio.h>
/*
__ ___ _
\ \ / (_)_ __ __| | _____ _____
\ \ /\ / /| | '_ \ / _` |/ _ \ \ /\ / / __|
\ V V / | | | | | (_| | (_) \ V V /\__ \
\_/\_/ |_|_| |_|\__,_|\___/ \_/\_/ |___/
*/
int64 iob_send(int64 s,io_batch* b) {
/* Windows has a sendfile called TransmitFile, which can send one
* header and one trailer buffer. */
iob_entry* x,* last;
io_entry* e;
int64 sent;
int i;
if (b->bytesleft==0) return 0;
sent=-1;
e=iarray_get(&io_fds,s);
if (!e) { errno=EBADF; return -3; }
if (!(x=array_get(&b->b,sizeof(iob_entry),b->next)))
return -3; /* can't happen error */
last=(iob_entry*)(((char*)array_start(&b->b))+array_bytes(&b->b));
fprintf(stderr,"iob_send() called!\n");
if (e->canwrite || e->sendfilequeued==1) {
fprintf(stderr,"...reaping finished WriteFile/TransmitFile.\n");
/* An overlapping write finished. Reap the result. */
if (e->bytes_written==-1) return -3;
if (e->bytes_written<x->n) {
sent=e->bytes_written;
if (x->n < e->bytes_written) {
e->bytes_written-=x->n;
x->n=0;
++x;
}
x->n -= e->bytes_written;
x->offset += e->bytes_written;
b->bytesleft -= e->bytes_written;
}
e->canwrite=0; e->sendfilequeued=0;
}
for (i=0; x+i<last; ++i)
if (x[i].n) break;
if (x[i].type==FROMBUF) {
fprintf(stderr,"found non-sent buffer batch entry at %d\n",i);
if (x+i+1 < last &&
(x[i+1].type==FROMFILE)) {
fprintf(stderr,"Next is a file, can use TransmitFile\n",i);
TRANSMIT_FILE_BUFFERS tfb;
e->sendfilequeued=1;
memset(&tfb,0,sizeof(tfb));
memset(&e[i].os,0,sizeof(e[i].os));
e[i].os.Offset=x[i].offset;
e[i].os.OffsetHigh=(x[i].offset>>32);
fprintf(stderr,"Calling TransmitFile on %p...",s);
if (!TransmitFile(s,(HANDLE)x[i].fd,
x[i].n+tfb.HeadLength>0xffff?0xffff:x[i].n,
0,&e[i].os,&tfb,TF_USE_KERNEL_APC)) {
if (GetLastError()==ERROR_IO_PENDING) {
fprintf(stderr," pending.!\n");
e->writequeued=1;
errno=EAGAIN;
e->errorcode=0;
return -1;
} else {
fprintf(stderr," failed!\n");
e->errorcode=errno;
return -3;
}
}
fprintf(stderr," OK!\n");
return sent;
} else {
e->writequeued=1;
fprintf(stderr,"Queueing WriteFile on %p...",s);
if (!WriteFile(s,x[i].buf+x[i].offset,x[i].n,0,&e->ow)) {
if (GetLastError()==ERROR_IO_PENDING) {
fprintf(stderr," pending.\n");
e->writequeued=1;
errno=EAGAIN;
e->errorcode=0;
return -1;
} else {
fprintf(stderr," failed.\n");
e->errorcode=errno;
return -3;
}
}
return sent;
}
} else {
fprintf(stderr,"Calling TransmitFile...\n");
e->sendfilequeued=1;
memset(&e[i].os,0,sizeof(e[i].os));
e[i].os.Offset=x[i].offset;
e[i].os.OffsetHigh=(x[i].offset>>32);
if (!TransmitFile(s,(HANDLE)x[i].fd,
x[i].n>0xffff?0xffff:x[i].n,
0,&e[i].os,0,TF_USE_KERNEL_APC))
return -3;
return sent;
}
}
#else
/*
_ _ _
| | | |_ __ (_)_ __
| | | | '_ \| \ \/ /
| |_| | | | | |> <
\___/|_| |_|_/_/\_\
*/
#include "havebsdsf.h"
#include <sys/types.h>
#include <sys/socket.h>
#include <sys/uio.h>
#include <errno.h>
#include <netinet/in.h>
#include <netinet/tcp.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include "havealloca.h"
#include "io_internal.h"
#include "iob_internal.h"
int64 iob_send(int64 s,io_batch* b) {
iob_entry* e,* last;
io_entry* E;
struct iovec* v;
uint64 total;
int64 sent=0;
size_t i;
size_t headers;
#ifdef MSG_MORE
int docork;
#endif
#ifdef HAVE_BSDSENDFILE
long trailers;
#endif
#ifdef TCP_CORK
int corked=0;
#endif
#ifdef MSG_ZEROCOPY
size_t sum=0;
#endif
if (b->bytesleft==0) return 0;
E=iarray_get(&io_fds,s);
// if (!E) { errno=EBADF; return -3; }
last=(iob_entry*)(((char*)array_start(&b->b))+array_bytes(&b->b));
v=alloca(b->bufs*sizeof(struct iovec));
total=0;
for (;;) {
if (!(e=array_get(&b->b,sizeof(iob_entry),b->next)))
return -3; /* can't happen error */
#ifdef HAVE_BSDSENDFILE
/* BSD sendfile can send headers and trailers. If we run on BSD, we
* should try to exploit this. */
headers=trailers=0;
#endif
for (i=0; e+i<last; ++i) {
if (e[i].type==FROMFILE) break;
v[i].iov_base=(char*)(e[i].buf+e[i].offset);
v[i].iov_len=e[i].n;
#ifdef MSG_ZEROCOPY
if (sum + v[i].iov_len > sum) sum += v[i].iov_len; else sum=-1;
#endif
}
headers=i;
#ifdef HAVE_BSDSENDFILE
/*
____ ____ ____
| __ ) ___|| _ \
| _ \___ \| | | |
| |_) |__) | |_| |
|____/____/|____/
*/
if (e+i<last && e[i].type==FROMFILE) {
#ifdef DEBUGONLINUX
off_t sbytes;
struct sf_hdtr hdr;
int r;
for (++i; e+i<last; ++i) {
if (e[i].type==FROMFILE) break;
v[i-1].iov_base=(char*)(e[i].buf+e[i].offset);
v[i-1].iov_len=e[i].n;
++trailers;
}
hdr.headers=v; hdr.hdr_cnt=headers;
hdr.trailers=v+headers; hdr.trl_cnt=trailers;
r=sendfile(e[headers].fd,s,e[headers].offset,e[headers].n,&hdr,&sbytes,0);
if (r==0)
sent=b->bytesleft;
else if (r==-1 && errno==EAGAIN) {
if (!(sent=sbytes)) {
sent=-1;
goto eagain;
}
} else
sent=-3;
#endif // DEBUGONLINUX
} else {
if (headers==1) /* cosmetics for strace */
sent=write(s,v[0].iov_base,v[0].iov_len);
else
sent=writev(s,v,headers);
if (sent==-1) {
if (errno!=EAGAIN)
sent=-3;
else {
#ifdef DEBUGONLINUX
eagain:
#endif
io_eagain_write(s);
return -1;
}
}
}
#else
/*
_ _
| | (_)_ __ _ ___ __
| | | | '_ \| | | \ \/ /
| |___| | | | | |_| |> <
|_____|_|_| |_|\__,_/_/\_\
*/
/* Linux has two ways to coalesce sent data; either setsockopt
* TCP_CORK or sendto/sendmsg with MSG_MORE. MSG_MORE saves syscalls
* in one scenario: when there is n buffers and then possibly one
* file to send. If there is more buffers after the file, then we
* need to use TCP_CORK to prevent the TCP push after the file. */
#ifdef MSG_MORE
if (e+i==last)
docork=-1; /* no files, only buffer, so no need for TCP_CORK or MSG_MORE */
else
docork=!(e+i+1==last);
if (docork>0) {
setsockopt(s,IPPROTO_TCP,TCP_CORK,(int[]){ 1 },sizeof(int));
corked=1;
}
if (headers) {
int ZEROCOPY=0;
#ifdef MSG_ZEROCOPY
static int nozerocopy;
int dozerocopy=1;
#else
const int nozerocopy=0;
const int dozerocopy=1;
#endif
if (nozerocopy && dozerocopy==0 && docork<0) { /* write+writev */
if (headers==1) /* cosmetics for strace */
sent=write(s,v[0].iov_base,v[0].iov_len);
else
sent=writev(s,v,headers);
} else {
#if defined(MSG_ZEROCOPY) && defined(SO_ZEROCOPY)
if (!nozerocopy && sum>=8*1024) {
/* MSG_ZEROCOPY has page table management overhead,
* it only pays off after 8k or so */
if (E && E->zerocopy==0) {
if (setsockopt(s, SOL_SOCKET, SO_ZEROCOPY, (int[]){ 1 },sizeof(int)) == 0) {
E->zerocopy=1;
ZEROCOPY=MSG_ZEROCOPY;
} else
nozerocopy=1;
} else
ZEROCOPY=MSG_ZEROCOPY;
}
#endif
if (headers==1) /* cosmetics for strace */
sent=sendto(s, v[0].iov_base, v[0].iov_len, MSG_MORE|ZEROCOPY, NULL, 0);
else {
struct msghdr msg;
memset(&msg,0,sizeof(msg));
msg.msg_iov=v;
msg.msg_iovlen=headers;
/* Difficulty: sendmsg has a built-in limit on the cmsgdata on
* Linux, net.core.optmem_max with a 20k default size. So we
* need to do a little song and dance here to not trigger it */
if (headers > 50) {
size_t skip=0, totalsent=0;
for (skip=0; skip<headers; skip+=50) {
size_t i,n;
int64 l=0;
n = headers-skip; if (n > 50) n=50;
for (i=0; i<n; ++i) l += v[skip+i].iov_len;
// printf("writing %d records from offset %d, %d bytes\n", skip, n, l);
if (E && E->notsock)
notsockwritev:
sent=writev(s,v+skip,n);
else {
msg.msg_iov=v + skip;
msg.msg_iovlen=n;
sent=sendmsg(s,&msg,MSG_MORE|ZEROCOPY);
if (sent==-1 && errno==ENOTSOCK) {
if (E) E->notsock=1;
goto notsockwritev;
}
}
if (sent > 0) totalsent += sent;
if (sent == l) continue; // we sent as much as we wanted, go for next batch
if (sent >= 0) // we wrote something but not the whole batch
break;
// we got an error, maybe EAGAIN
if (errno==EAGAIN) {
if (totalsent == 0) {
io_eagain_write(s);
return -1;
}
// fall through
}
// actual error
break;
}
// if we get here, we wrote it all or we got an EAGAIN after
// writing something. Treat as regular partial write.
sent = totalsent;
} else {
if (E && E->notsock)
notsockwritev2:
sent=writev(s,v,headers);
else {
msg.msg_iov=v;
msg.msg_iovlen=headers;
sent=sendmsg(s,&msg,MSG_MORE|ZEROCOPY);
if (sent==-1 && errno==ENOTSOCK) {
if (E) E->notsock=1;
goto notsockwritev2;
}
}
}
}
}
if (sent==-1) {
if (errno==EAGAIN) {
io_eagain_write(s);
return -1;
}
sent=-3;
}
} else
sent=io_sendfile(s,e->fd,e->offset,e->n);
#else /* !MSG_MORE */
#ifdef TCP_CORK
if (b->bufs && b->files && !b->next) {
setsockopt(s,IPPROTO_TCP,TCP_CORK,(int[]){ 1 },sizeof(int));
corked=1;
}
#endif
if (headers) {
if (headers==1) /* cosmetics for strace */
sent=write(s,v[0].iov_base,v[0].iov_len);
else
sent=writev(s,v,headers);
if (sent==-1) {
if (errno==EAGAIN) {
io_eagain_write(s);
return -1;
}
sent=-3;
}
} else {
sent=io_sendfile(s,e->fd,e->offset,e->n);
}
#endif /* !MSG_MORE */
#endif
if (sent>0)
total+=sent;
else
return total?total:(uint64)sent;
if ((uint64)sent==b->bytesleft) {
iob_reset(b);
break;
} else if (sent>0) {
uint64 rest=sent;
b->bytesleft-=rest;
for (i=0; e+i<last; ++i) {
if (e[i].n<=rest) {
rest-=e[i].n;
if (b->autofree) {
if (e[i].cleanup)
e[i].cleanup(e+i);
e[i].cleanup=0;
}
++b->next;
if (!rest) break;
} else {
e[i].offset+=rest;
e[i].n-=rest;
goto abort;
}
}
if (b->bytesleft==0) {
io_eagain_write(s);
return total;
}
} else break;
}
abort:
#ifdef TCP_CORK
if (corked)
setsockopt(s,IPPROTO_TCP,TCP_CORK,(int[]){ 0 },sizeof(int));
#endif
return total;
}
#endif