switch io_fds from array to newly implemented (hopefully thread-safe) iarray
switch epoll from level triggering to edge triggering
This commit is contained in:
parent
dd436c5bf8
commit
a544abc39c
4
CHANGES
4
CHANGES
@ -6,12 +6,14 @@
|
||||
change buffer to have a destructor function pointer
|
||||
SECURITY: fmt_strn would write one byte too many (returned right length though, so usually not a problem as that byte would be overwritten with \0 by the caller anyway)
|
||||
fmt_pad and fmt_fill fail more gracefully when srclen > maxlen
|
||||
You can now say $ make WERROR=-Werror (compiling t.c will fail but that's not part of the library)
|
||||
You can now say $ make WERROR=-Werror (compiling t.c may fail but that's strictly speaking not part of the library)
|
||||
scan_html now decodes HTML entities based on the actual w3c list (from entities.json, say gmake update to fetch the current version)
|
||||
added fmt_escapechar* to fmt.h (implement various escaping mechanisms also found in textcode but for a single char not a whole string, and they always escape, not just when they think it's needed)
|
||||
scan_ushort was supposed to abort early and return 5 when attempting to parse "65536", because the result does not fit. It did not. Now it does.
|
||||
scan_*long, scan_*int, scan_*short now properly abort if the number would not fit
|
||||
SECURITY: check for integer overflow in stralloc_ready
|
||||
switch io_fds from array to newly implemented (hopefully thread-safe) iarray
|
||||
switch epoll from level triggering to edge triggering
|
||||
|
||||
0.29:
|
||||
save 8 bytes in taia.h for 64-bit systems
|
||||
|
@ -17,7 +17,7 @@ static iarray_page* new_page(size_t pagesize) {
|
||||
}
|
||||
|
||||
void* iarray_allocate(iarray* ia,size_t pos) {
|
||||
size_t index;
|
||||
size_t index,prevlen=ia->len;
|
||||
iarray_page** p=&ia->pages[pos%(sizeof(ia->pages)/sizeof(ia->pages[0]))];
|
||||
iarray_page* newpage=0;
|
||||
for (index=0; pos<index+ia->elemperpage; index+=ia->elemperpage) {
|
||||
@ -28,7 +28,11 @@ void* iarray_allocate(iarray* ia,size_t pos) {
|
||||
newpage=0;
|
||||
}
|
||||
if (index+ia->elemperpage>pos) {
|
||||
size_t l;
|
||||
if (newpage) munmap(newpage,ia->bytesperpage);
|
||||
do {
|
||||
l=__CAS(&ia->len,prevlen,pos);
|
||||
} while (l<pos);
|
||||
return &(*p)->data[(pos-index)*ia->elemsize];
|
||||
}
|
||||
p=&(*p)->next;
|
||||
|
@ -6,7 +6,7 @@ static ssize_t op() {
|
||||
}
|
||||
|
||||
int buffer_mmapread(buffer* b,const char* filename) {
|
||||
if (!(b->x=mmap_read(filename,&b->n))) return -1;
|
||||
if (!(b->x=(char*)mmap_read(filename,&b->n))) return -1;
|
||||
b->p=0; b->a=b->n;
|
||||
b->fd=-1;
|
||||
b->op=op;
|
||||
|
3
iarray.h
3
iarray.h
@ -24,12 +24,13 @@ typedef struct _iarray_page {
|
||||
|
||||
typedef struct {
|
||||
iarray_page* pages[16];
|
||||
size_t elemsize,elemperpage,bytesperpage;
|
||||
size_t elemsize,elemperpage,bytesperpage,len;
|
||||
} iarray;
|
||||
|
||||
void iarray_init(iarray* ia,size_t elemsize);
|
||||
void* iarray_get(iarray* ia,size_t pos);
|
||||
void* iarray_allocate(iarray* ia,size_t pos);
|
||||
size_t iarray_length(iarray* ia);
|
||||
|
||||
/* WARNING: do not use the array during or after iarray_free, make sure
|
||||
* no threads are potentially doing anything with the iarray while it is
|
||||
|
@ -13,7 +13,7 @@
|
||||
#endif
|
||||
|
||||
void io_block(int64 d) {
|
||||
io_entry* e=array_get(&io_fds,sizeof(io_entry),d);
|
||||
io_entry* e=iarray_get(&io_fds,d);
|
||||
#ifdef __MINGW32__
|
||||
unsigned long i=0;
|
||||
if (ioctlsocket( d, FIONBIO, &i)==0)
|
||||
|
@ -14,7 +14,7 @@ int64 io_canread() {
|
||||
if (first_readable==-1)
|
||||
#ifdef HAVE_SIGIO
|
||||
{
|
||||
if (alt_firstread>=0 && (e=array_get(&io_fds,sizeof(io_entry),alt_firstread)) && e->canread) {
|
||||
if (alt_firstread>=0 && (e=iarray_get(&io_fds,alt_firstread)) && e->canread) {
|
||||
debug_printf(("io_canread: normal read queue is empty, swapping in alt read queue (starting with %ld)\n",alt_firstread));
|
||||
first_readable=alt_firstread;
|
||||
alt_firstread=-1;
|
||||
@ -26,7 +26,7 @@ int64 io_canread() {
|
||||
#endif
|
||||
for (;;) {
|
||||
int64 r;
|
||||
e=array_get(&io_fds,sizeof(io_entry),first_readable);
|
||||
e=iarray_get(&io_fds,first_readable);
|
||||
if (!e) break;
|
||||
r=first_readable;
|
||||
first_readable=e->next_read;
|
||||
|
@ -10,7 +10,7 @@ int64 io_canwrite() {
|
||||
if (first_writeable==-1)
|
||||
#ifdef HAVE_SIGIO
|
||||
{
|
||||
if (alt_firstwrite>=0 && (e=array_get(&io_fds,sizeof(io_entry),alt_firstwrite)) && e->canwrite) {
|
||||
if (alt_firstwrite>=0 && (e=iarray_get(&io_fds,alt_firstwrite)) && e->canwrite) {
|
||||
debug_printf(("io_canwrite: normal write queue is empty, swapping in alt write queue (starting with %ld)\n",alt_firstwrite));
|
||||
first_writeable=alt_firstwrite;
|
||||
alt_firstwrite=-1;
|
||||
@ -22,7 +22,7 @@ int64 io_canwrite() {
|
||||
#endif
|
||||
for (;;) {
|
||||
int64 r;
|
||||
e=array_get(&io_fds,sizeof(io_entry),first_writeable);
|
||||
e=iarray_get(&io_fds,first_writeable);
|
||||
if (!e) break;
|
||||
r=first_writeable;
|
||||
first_writeable=e->next_write;
|
||||
|
@ -12,7 +12,7 @@ extern void io_dontwantwrite_really(int64 d,io_entry* e);
|
||||
|
||||
void io_close(int64 d) {
|
||||
io_entry* e;
|
||||
if ((e=array_get(&io_fds,sizeof(io_entry),d))) {
|
||||
if ((e=iarray_get(&io_fds,d))) {
|
||||
e->inuse=0;
|
||||
e->cookie=0;
|
||||
if (e->kernelwantread) io_dontwantread_really(d,e);
|
||||
|
@ -26,19 +26,10 @@
|
||||
|
||||
void io_dontwantread_really(int64 d, io_entry* e) {
|
||||
int newfd;
|
||||
(void)d;
|
||||
assert(e->kernelwantread);
|
||||
newfd=!e->kernelwantwrite;
|
||||
io_wanted_fds-=newfd;
|
||||
#ifdef HAVE_EPOLL
|
||||
if (io_waitmode==EPOLL) {
|
||||
struct epoll_event x;
|
||||
byte_zero(&x,sizeof(x)); // to shut up valgrind
|
||||
x.events=0;
|
||||
if (e->kernelwantwrite) x.events|=EPOLLOUT;
|
||||
x.data.fd=d;
|
||||
epoll_ctl(io_master,e->kernelwantwrite?EPOLL_CTL_MOD:EPOLL_CTL_DEL,d,&x);
|
||||
}
|
||||
#endif
|
||||
#ifdef HAVE_KQUEUE
|
||||
if (io_waitmode==KQUEUE) {
|
||||
struct kevent kev;
|
||||
@ -63,7 +54,7 @@ void io_dontwantread_really(int64 d, io_entry* e) {
|
||||
}
|
||||
|
||||
void io_dontwantread(int64 d) {
|
||||
io_entry* e=array_get(&io_fds,sizeof(io_entry),d);
|
||||
io_entry* e=iarray_get(&io_fds,d);
|
||||
if (e) {
|
||||
if (e->canread)
|
||||
io_dontwantread_really(d,e);
|
||||
|
@ -32,19 +32,10 @@
|
||||
|
||||
void io_dontwantwrite_really(int64 d,io_entry* e) {
|
||||
int newfd;
|
||||
(void)d;
|
||||
assert(e->kernelwantwrite);
|
||||
newfd=!e->kernelwantread;
|
||||
io_wanted_fds-=newfd;
|
||||
#ifdef HAVE_EPOLL
|
||||
if (io_waitmode==EPOLL) {
|
||||
struct epoll_event x;
|
||||
byte_zero(&x,sizeof(x)); // to shut up valgrind
|
||||
x.events=0;
|
||||
if (e->wantread) x.events|=EPOLLIN;
|
||||
x.data.fd=d;
|
||||
epoll_ctl(io_master,e->kernelwantread?EPOLL_CTL_MOD:EPOLL_CTL_DEL,d,&x);
|
||||
}
|
||||
#endif
|
||||
#ifdef HAVE_KQUEUE
|
||||
if (io_waitmode==KQUEUE) {
|
||||
struct kevent kev;
|
||||
@ -69,7 +60,7 @@ void io_dontwantwrite_really(int64 d,io_entry* e) {
|
||||
}
|
||||
|
||||
void io_dontwantwrite(int64 d) {
|
||||
io_entry* e=array_get(&io_fds,sizeof(io_entry),d);
|
||||
io_entry* e=iarray_get(&io_fds,d);
|
||||
if (e) {
|
||||
if (e->canwrite)
|
||||
io_dontwantwrite_really(d,e);
|
||||
|
@ -1,7 +1,7 @@
|
||||
#include "io_internal.h"
|
||||
|
||||
void io_eagain(int64 d) {
|
||||
io_entry* e=array_get(&io_fds,sizeof(io_entry),d);
|
||||
io_entry* e=iarray_get(&io_fds,d);
|
||||
if (e) {
|
||||
if (e->wantread) e->canread=0;
|
||||
if (e->wantwrite) e->canwrite=0;
|
||||
|
21
io/io_fd.c
21
io/io_fd.c
@ -25,11 +25,18 @@
|
||||
#include <sys/devpoll.h>
|
||||
#endif
|
||||
|
||||
#ifdef __dietlibc__
|
||||
#include <sys/atomic.h>
|
||||
#else
|
||||
#define __CAS(ptr,oldval,newval) __sync_val_compare_and_swap(ptr,oldval,newval)
|
||||
#endif
|
||||
|
||||
#ifdef __MINGW32__
|
||||
#include <stdio.h>
|
||||
extern HANDLE io_comport;
|
||||
#endif
|
||||
array io_fds;
|
||||
iarray io_fds;
|
||||
static int io_fds_inited;
|
||||
uint64 io_wanted_fds;
|
||||
array io_pollfds;
|
||||
enum __io_waitmode io_waitmode;
|
||||
@ -52,7 +59,17 @@ static io_entry* io_fd_internal(int64 d) {
|
||||
if ((r=fcntl(d,F_GETFL,0)) == -1)
|
||||
return 0; /* file descriptor not open */
|
||||
#endif
|
||||
if (!(e=array_allocate(&io_fds,sizeof(io_entry),d))) return 0;
|
||||
/* Problem: we might be the first to use io_fds. We need to make sure
|
||||
* we are the only ones to initialize it. So set io_fds_inited to 2
|
||||
* and not to 1. We know we are done when it is 1. We know we need
|
||||
* to do something when it is 0. We know somebody else is doing it
|
||||
* when it is 2. */
|
||||
if (__CAS(&io_fds_inited,0,2)==0) {
|
||||
iarray_init(&io_fds,sizeof(io_entry));
|
||||
io_fds_inited=1;
|
||||
} else
|
||||
do { asm("" : : : "memory"); } while (io_fds_inited!=1);
|
||||
if (!(e=iarray_allocate(&io_fds,d))) return 0;
|
||||
if (e->inuse) return e;
|
||||
byte_zero(e,sizeof(io_entry));
|
||||
e->inuse=1;
|
||||
|
@ -1,6 +1,6 @@
|
||||
#include "io_internal.h"
|
||||
|
||||
void io_finishandshutdown(void) {
|
||||
array_reset(&io_fds);
|
||||
iarray_free(&io_fds);
|
||||
array_reset(&io_pollfds);
|
||||
}
|
||||
|
@ -3,6 +3,6 @@
|
||||
|
||||
void* io_getcookie(int64 d) {
|
||||
io_entry* e;
|
||||
e=array_get(&io_fds,sizeof(io_entry),d);
|
||||
e=iarray_get(&io_fds,d);
|
||||
return e?e->cookie:0;
|
||||
}
|
||||
|
@ -15,7 +15,7 @@ int64 io_mmapwritefile(int64 out,int64 in,uint64 off,uint64 bytes,io_write_callb
|
||||
char buf[BUFSIZE];
|
||||
int n,m;
|
||||
uint64 sent=0;
|
||||
io_entry* e=array_get(&io_fds,sizeof(io_entry),out);
|
||||
io_entry* e=iarray_get(&io_fds,out);
|
||||
if (e) {
|
||||
const char* c;
|
||||
unsigned long left;
|
||||
|
@ -13,7 +13,7 @@
|
||||
#endif
|
||||
|
||||
void io_nonblock(int64 d) {
|
||||
io_entry* e=array_get(&io_fds,sizeof(io_entry),d);
|
||||
io_entry* e=iarray_get(&io_fds,d);
|
||||
#ifdef __MINGW32__
|
||||
unsigned long i=1;
|
||||
if (ioctlsocket( d, FIONBIO, &i)==0)
|
||||
|
@ -86,7 +86,7 @@ _syscall4(int,sendfile,int,out,int,in,long *,offset,unsigned long,count)
|
||||
|
||||
int64 io_sendfile(int64 s,int64 fd,uint64 off,uint64 n) {
|
||||
off_t o=off;
|
||||
io_entry* e=array_get(&io_fds,sizeof(io_entry),s);
|
||||
io_entry* e=iarray_get(&io_fds,s);
|
||||
off_t i;
|
||||
uint64 done=0;
|
||||
/* What a spectacularly broken design for sendfile64.
|
||||
@ -118,7 +118,7 @@ int64 io_sendfile(int64 s,int64 fd,uint64 off,uint64 n) {
|
||||
#include <mswsock.h>
|
||||
|
||||
int64 io_sendfile(int64 out,int64 in,uint64 off,uint64 bytes) {
|
||||
io_entry* e=array_get(&io_fds,sizeof(io_entry),out);
|
||||
io_entry* e=iarray_get(&io_fds,out);
|
||||
if (!e) { errno=EBADF; return -3; }
|
||||
if (e->sendfilequeued==1) {
|
||||
/* we called TransmitFile, and it returned. */
|
||||
|
@ -4,6 +4,6 @@
|
||||
|
||||
void io_setcookie(int64 d,void* cookie) {
|
||||
io_entry* e;
|
||||
if ((e=array_get(&io_fds,sizeof(io_entry),d)))
|
||||
if ((e=iarray_get(&io_fds,d)))
|
||||
e->cookie=cookie;
|
||||
}
|
||||
|
@ -1,7 +1,7 @@
|
||||
#include "io_internal.h"
|
||||
|
||||
void io_timeout(int64 d,tai6464 t) {
|
||||
io_entry* e=array_get(&io_fds,sizeof(io_entry),d);
|
||||
io_entry* e=iarray_get(&io_fds,d);
|
||||
if (e)
|
||||
e->timeout=t;
|
||||
}
|
||||
|
@ -2,13 +2,13 @@
|
||||
|
||||
int64 io_timeouted() {
|
||||
tai6464 now;
|
||||
static long ptr;
|
||||
static size_t ptr;
|
||||
io_entry* e;
|
||||
long alen=array_length(&io_fds,sizeof(io_entry));
|
||||
size_t alen=iarray_length(&io_fds);
|
||||
taia_now(&now);
|
||||
++ptr;
|
||||
if (ptr>=alen) ptr=0;
|
||||
e=array_get(&io_fds,sizeof(io_entry),ptr);
|
||||
e=iarray_get(&io_fds,ptr);
|
||||
if (!e) return -1;
|
||||
for (;ptr<alen; ++ptr,++e) {
|
||||
if (e->inuse && e->timeout.sec.x && taia_less(&e->timeout,&now)) {
|
||||
|
@ -20,7 +20,7 @@
|
||||
* we are called. */
|
||||
|
||||
int64 io_tryread(int64 d,char* buf,int64 len) {
|
||||
io_entry* e=array_get(&io_fds,sizeof(io_entry),d);
|
||||
io_entry* e=iarray_get(&io_fds,d);
|
||||
if (!e) { errno=EBADF; return -3; }
|
||||
if (len<0) { errno=EINVAL; return -3; }
|
||||
if (e->readqueued==2) {
|
||||
@ -82,7 +82,7 @@ int64 io_tryread(int64 d,char* buf,int64 len) {
|
||||
long r;
|
||||
struct itimerval old,new;
|
||||
struct pollfd p;
|
||||
io_entry* e=array_get(&io_fds,sizeof(io_entry),d);
|
||||
io_entry* e=iarray_get(&io_fds,d);
|
||||
if (!e) { errno=EBADF; return -3; }
|
||||
if (!e->nonblock) {
|
||||
p.fd=d;
|
||||
|
@ -5,7 +5,7 @@ int64 io_tryreadtimeout(int64 d,char* buf,int64 len) {
|
||||
int64 r=io_tryread(d,buf,len);
|
||||
if (r==-1) {
|
||||
tai6464 x;
|
||||
io_entry* e=array_get(&io_fds,sizeof(io_entry),d);
|
||||
io_entry* e=iarray_get(&io_fds,d);
|
||||
taia_now(&x);
|
||||
if (!taia_less(&x,&e->timeout)) {
|
||||
errno=ETIMEDOUT;
|
||||
|
@ -19,7 +19,7 @@
|
||||
* stuff on I/O batches. */
|
||||
|
||||
int64 io_trywrite(int64 d,const char* buf,int64 len) {
|
||||
io_entry* e=array_get(&io_fds,sizeof(io_entry),d);
|
||||
io_entry* e=iarray_get(&io_fds,d);
|
||||
int r;
|
||||
if (!e) { errno=EBADF; return -3; }
|
||||
if (!e->nonblock) {
|
||||
@ -75,7 +75,7 @@ int64 io_trywrite(int64 d,const char* buf,int64 len) {
|
||||
long r;
|
||||
struct itimerval old,new;
|
||||
struct pollfd p;
|
||||
io_entry* e=array_get(&io_fds,sizeof(io_entry),d);
|
||||
io_entry* e=iarray_get(&io_fds,d);
|
||||
io_sigpipe();
|
||||
if (!e) { errno=EBADF; return -3; }
|
||||
if (!e->nonblock) {
|
||||
|
@ -5,7 +5,7 @@ int64 io_trywritetimeout(int64 d,const char* buf,int64 len) {
|
||||
int64 r=io_trywrite(d,buf,len);
|
||||
if (r==-1) {
|
||||
tai6464 x;
|
||||
io_entry* e=array_get(&io_fds,sizeof(io_entry),d);
|
||||
io_entry* e=iarray_get(&io_fds,d);
|
||||
taia_now(&x);
|
||||
if (!taia_less(&x,&e->timeout)) {
|
||||
errno=ETIMEDOUT;
|
||||
|
@ -11,7 +11,7 @@
|
||||
|
||||
int64 io_waitread(int64 d,char* buf,int64 len) {
|
||||
long r;
|
||||
io_entry* e=array_get(&io_fds,sizeof(io_entry),d);
|
||||
io_entry* e=iarray_get(&io_fds,d);
|
||||
if (!e) { errno=EBADF; return -3; }
|
||||
if (e->nonblock) {
|
||||
unsigned long i=0;
|
||||
@ -32,7 +32,7 @@ int64 io_waitread(int64 d,char* buf,int64 len) {
|
||||
int64 io_waitread(int64 d,char* buf,int64 len) {
|
||||
long r;
|
||||
struct pollfd p;
|
||||
io_entry* e=array_get(&io_fds,sizeof(io_entry),d);
|
||||
io_entry* e=iarray_get(&io_fds,d);
|
||||
if (!e) { errno=EBADF; return -3; }
|
||||
if (e->nonblock) {
|
||||
again:
|
||||
|
@ -28,6 +28,11 @@
|
||||
#include <sys/devpoll.h>
|
||||
#endif
|
||||
|
||||
#ifdef __dietlibc__
|
||||
#include <fmt.h>
|
||||
#include <write12.h>
|
||||
#endif
|
||||
|
||||
#ifdef DEBUG
|
||||
#include <stdio.h>
|
||||
#endif
|
||||
@ -119,97 +124,38 @@ int64 io_waituntil2(int64 milliseconds) {
|
||||
if (io_waitmode==EPOLL) {
|
||||
int n;
|
||||
struct epoll_event y[100];
|
||||
io_entry* e;
|
||||
if (alt_firstread>=0 && (e=iarray_get(&io_fds,alt_firstread)) && e->canread) return 1;
|
||||
if (alt_firstwrite>=0 && (e=iarray_get(&io_fds,alt_firstwrite)) && e->canwrite) return 1;
|
||||
if ((n=epoll_wait(io_master,y,100,milliseconds))==-1) return -1;
|
||||
for (i=n-1; i>=0; --i) {
|
||||
io_entry* e=array_get(&io_fds,sizeof(io_entry),y[i].data.fd);
|
||||
for (i=0; i<n; ++i) {
|
||||
e=iarray_get(&io_fds,y[i].data.fd);
|
||||
if (e) {
|
||||
int curevents=0,newevents;
|
||||
if (e->kernelwantread) curevents |= EPOLLIN;
|
||||
if (e->kernelwantwrite) curevents |= EPOLLOUT;
|
||||
|
||||
#ifdef DEBUG
|
||||
if ((y[i].events&(EPOLLIN|EPOLLPRI|EPOLLRDNORM|EPOLLRDBAND)) && !e->kernelwantread)
|
||||
printf("got unexpected read event on fd #%d\n",y[i].data.fd);
|
||||
if ((y[i].events&EPOLLOUT) && !e->kernelwantwrite)
|
||||
printf("got unexpected write event on fd #%d\n",y[i].data.fd);
|
||||
#endif
|
||||
|
||||
if (y[i].events&(EPOLLERR|EPOLLHUP)) {
|
||||
if (y[i].events&(POLLERR|POLLHUP)) {
|
||||
/* error; signal whatever app is looking for */
|
||||
if (e->wantread) y[i].events|=EPOLLIN;
|
||||
if (e->wantwrite) y[i].events|=EPOLLOUT;
|
||||
if (e->wantread) y[i].events|=POLLIN;
|
||||
if (e->wantwrite) y[i].events|=POLLOUT;
|
||||
}
|
||||
|
||||
newevents=0;
|
||||
if (!e->canread || e->wantread) {
|
||||
newevents|=EPOLLIN;
|
||||
e->kernelwantread=1;
|
||||
} else
|
||||
e->kernelwantread=0;
|
||||
if (!e->canwrite || e->wantwrite) {
|
||||
newevents|=EPOLLOUT;
|
||||
e->kernelwantwrite=1;
|
||||
} else
|
||||
e->kernelwantwrite=0;
|
||||
|
||||
/* if we think we can not read, but the kernel tells us that we
|
||||
* can, put this fd in the relevant data structures */
|
||||
if (!e->canread && (y[i].events&(EPOLLIN|EPOLLPRI|EPOLLRDNORM|EPOLLRDBAND))) {
|
||||
if (e->canread) {
|
||||
newevents &= ~EPOLLIN;
|
||||
} else {
|
||||
e->canread=1;
|
||||
if (e->wantread) {
|
||||
e->next_read=first_readable;
|
||||
first_readable=y[i].data.fd;
|
||||
}
|
||||
}
|
||||
if (y[i].events&POLLIN && !e->canread) {
|
||||
debug_printf(("io_waituntil2: enqueueing %ld in normal read queue before %ld\n",info.si_fd,first_readable));
|
||||
e->canread=1;
|
||||
e->next_read=first_readable;
|
||||
first_readable=y[i].data.fd;
|
||||
}
|
||||
/* if the kernel says the fd is writable, ... */
|
||||
if (y[i].events&EPOLLOUT) {
|
||||
/* Usually, if the kernel says a descriptor is writable, we
|
||||
* note it and do not tell the kernel not to tell us again.
|
||||
* The idea is that once we notify the caller that the fd is
|
||||
* writable, and the caller handles the event, the caller will
|
||||
* just ask to be notified of future write events again. We
|
||||
* are trying to save the superfluous epoll_ctl syscalls.
|
||||
* If e->canwrite is set, then this gamble did not work out.
|
||||
* We told the caller, yet after the caller is done we still
|
||||
* got another write event. Clearly the user is implementing
|
||||
* some kind of throttling and we can tell the kernel to leave
|
||||
* us alone for now. */
|
||||
if (e->canwrite) {
|
||||
newevents &= ~EPOLLOUT;
|
||||
e->kernelwantwrite=0;
|
||||
} else {
|
||||
/* If !e->wantwrite: The laziness optimization in
|
||||
* io_dontwantwrite hit. We did not tell the kernel that we
|
||||
* are no longer interested in writing to save the syscall.
|
||||
* Now we know we could write if we wanted; remember that
|
||||
* and then go on. */
|
||||
e->canwrite=1;
|
||||
if (e->wantwrite) {
|
||||
e->next_write=first_writeable;
|
||||
first_writeable=y[i].data.fd;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (newevents != curevents) {
|
||||
#if 0
|
||||
printf("canread %d, wantread %d, kernelwantread %d, canwrite %d, wantwrite %d, kernelwantwrite %d\n",
|
||||
e->canread,e->wantread,e->kernelwantread,e->canwrite,e->wantwrite,e->kernelwantwrite);
|
||||
printf("newevents: read %d write %d\n",!!(newevents&EPOLLIN),!!(newevents&EPOLLOUT));
|
||||
#endif
|
||||
y[i].events=newevents;
|
||||
if (newevents) {
|
||||
epoll_ctl(io_master,EPOLL_CTL_MOD,y[i].data.fd,y+i);
|
||||
} else {
|
||||
epoll_ctl(io_master,EPOLL_CTL_DEL,y[i].data.fd,y+i);
|
||||
--io_wanted_fds;
|
||||
}
|
||||
if (y[i].events&POLLOUT && !e->canwrite) {
|
||||
debug_printf(("io_waituntil2: enqueueing %ld in normal write queue before %ld\n",info.si_fd,first_writeable));
|
||||
e->canwrite=1;
|
||||
e->next_write=first_writeable;
|
||||
first_writeable=y[i].data.fd;
|
||||
}
|
||||
} else {
|
||||
#ifdef __dietlibc__
|
||||
char buf[FMT_ULONG];
|
||||
buf[fmt_ulong(buf,y[i].data.fd)]=0;
|
||||
__write2("got epoll event on invalid fd ");
|
||||
__write2(buf);
|
||||
__write2("!\n");
|
||||
#endif
|
||||
epoll_ctl(io_master,EPOLL_CTL_DEL,y[i].data.fd,y+i);
|
||||
}
|
||||
}
|
||||
@ -224,7 +170,7 @@ int64 io_waituntil2(int64 milliseconds) {
|
||||
ts.tv_sec=milliseconds/1000; ts.tv_nsec=(milliseconds%1000)*1000000;
|
||||
if ((n=kevent(io_master,0,0,y,100,milliseconds!=-1?&ts:0))==-1) return -1;
|
||||
for (i=n-1; i>=0; --i) {
|
||||
io_entry* e=array_get(&io_fds,sizeof(io_entry),y[--n].ident);
|
||||
io_entry* e=iarray_get(&io_fds,y[--n].ident);
|
||||
if (e) {
|
||||
if (y[n].flags&EV_ERROR) {
|
||||
/* error; signal whatever app is looking for */
|
||||
@ -260,7 +206,7 @@ int64 io_waituntil2(int64 milliseconds) {
|
||||
timeout.dp_fds=y;
|
||||
if ((n=ioctl(io_master,DP_POLL,&timeout))==-1) return -1;
|
||||
for (i=n-1; i>=0; --i) {
|
||||
io_entry* e=array_get(&io_fds,sizeof(io_entry),y[--n].fd);
|
||||
io_entry* e=iarray_get(&io_fds,y[--n].fd);
|
||||
if (e) {
|
||||
if (y[n].revents&(POLLERR|POLLHUP|POLLNVAL)) {
|
||||
/* error; signal whatever app is looking for */
|
||||
@ -296,8 +242,8 @@ int64 io_waituntil2(int64 milliseconds) {
|
||||
struct timespec ts;
|
||||
int r;
|
||||
io_entry* e;
|
||||
if (alt_firstread>=0 && (e=array_get(&io_fds,sizeof(io_entry),alt_firstread)) && e->canread) return 1;
|
||||
if (alt_firstwrite>=0 && (e=array_get(&io_fds,sizeof(io_entry),alt_firstwrite)) && e->canwrite) return 1;
|
||||
if (alt_firstread>=0 && (e=iarray_get(&io_fds,alt_firstread)) && e->canread) return 1;
|
||||
if (alt_firstwrite>=0 && (e=iarray_get(&io_fds,alt_firstwrite)) && e->canwrite) return 1;
|
||||
if (milliseconds==-1)
|
||||
r=sigwaitinfo(&io_ss,&info);
|
||||
else {
|
||||
@ -311,7 +257,7 @@ int64 io_waituntil2(int64 milliseconds) {
|
||||
goto dopoll;
|
||||
default:
|
||||
if (r==io_signum) {
|
||||
io_entry* e=array_get(&io_fds,sizeof(io_entry),info.si_fd);
|
||||
io_entry* e=iarray_get(&io_fds,info.si_fd);
|
||||
if (e) {
|
||||
if (info.si_band&(POLLERR|POLLHUP)) {
|
||||
/* error; signal whatever app is looking for */
|
||||
@ -351,7 +297,7 @@ dopoll:
|
||||
}
|
||||
fprintf(stderr,"Calling GetQueuedCompletionStatus %p...",io_comport);
|
||||
if (GetQueuedCompletionStatus(io_comport,&numberofbytes,&x,&o,milliseconds==-1?INFINITE:milliseconds)) {
|
||||
io_entry* e=array_get(&io_fds,sizeof(io_entry),x);
|
||||
io_entry* e=iarray_get(&io_fds,x);
|
||||
fprintf(stderr," OK. Got %x, e=%p\n",x,e);
|
||||
if (!e) return 0;
|
||||
e->errorcode=0;
|
||||
@ -398,7 +344,7 @@ dopoll:
|
||||
/* we got a completion packet for a failed I/O operation */
|
||||
err=GetLastError();
|
||||
if (err==WAIT_TIMEOUT) return 0; /* or maybe not */
|
||||
e=array_get(&io_fds,sizeof(io_entry),x);
|
||||
e=iarray_get(&io_fds,x);
|
||||
if (!e) return 0; /* WTF?! */
|
||||
e->errorcode=err;
|
||||
if (o==&e->or && (e->readqueued || e->acceptqueued)) {
|
||||
@ -422,8 +368,8 @@ dopoll:
|
||||
return 1;
|
||||
}
|
||||
#else
|
||||
for (i=r=0; i<array_length(&io_fds,sizeof(io_entry)); ++i) {
|
||||
io_entry* e=array_get(&io_fds,sizeof(io_entry),i);
|
||||
for (i=r=0; (size_t)i<iarray_length(&io_fds); ++i) {
|
||||
io_entry* e=iarray_get(&io_fds,i);
|
||||
if (!e) return -1;
|
||||
e->canread=e->canwrite=0;
|
||||
if (e->wantread || e->wantwrite) {
|
||||
@ -439,7 +385,7 @@ dopoll:
|
||||
p=array_start(&io_pollfds);
|
||||
if ((i=poll(array_start(&io_pollfds),r,milliseconds))<1) return -1;
|
||||
for (j=r-1; j>=0; --j) {
|
||||
io_entry* e=array_get(&io_fds,sizeof(io_entry),p->fd);
|
||||
io_entry* e=iarray_get(&io_fds,p->fd);
|
||||
if (p->revents&(POLLERR|POLLHUP|POLLNVAL)) {
|
||||
/* error; signal whatever app is looking for */
|
||||
if (e->wantread) p->revents|=POLLIN;
|
||||
|
@ -11,7 +11,7 @@
|
||||
|
||||
int64 io_waitwrite(int64 d,const char* buf,int64 len) {
|
||||
long r;
|
||||
io_entry* e=array_get(&io_fds,sizeof(io_entry),d);
|
||||
io_entry* e=iarray_get(&io_fds,d);
|
||||
if (!e) { errno=EBADF; return -3; }
|
||||
if (e->nonblock) {
|
||||
unsigned long i=0;
|
||||
@ -32,7 +32,7 @@ int64 io_waitwrite(int64 d,const char* buf,int64 len) {
|
||||
int64 io_waitwrite(int64 d,const char* buf,int64 len) {
|
||||
long r;
|
||||
struct pollfd p;
|
||||
io_entry* e=array_get(&io_fds,sizeof(io_entry),d);
|
||||
io_entry* e=iarray_get(&io_fds,d);
|
||||
io_sigpipe();
|
||||
if (!e) { errno=EBADF; return -3; }
|
||||
if (e->nonblock) {
|
||||
|
@ -35,16 +35,6 @@ void io_wantread_really(int64 d,io_entry* e) {
|
||||
assert(!e->kernelwantread);
|
||||
newfd=!e->kernelwantwrite;
|
||||
io_wanted_fds+=newfd;
|
||||
#ifdef HAVE_EPOLL
|
||||
if (io_waitmode==EPOLL) {
|
||||
struct epoll_event x;
|
||||
byte_zero(&x,sizeof(x)); // to shut up valgrind
|
||||
x.events=EPOLLIN;
|
||||
if (e->kernelwantwrite) x.events|=EPOLLOUT;
|
||||
x.data.fd=d;
|
||||
epoll_ctl(io_master,e->kernelwantwrite?EPOLL_CTL_MOD:EPOLL_CTL_ADD,d,&x);
|
||||
}
|
||||
#endif
|
||||
#ifdef HAVE_KQUEUE
|
||||
if (io_waitmode==KQUEUE) {
|
||||
struct kevent kev;
|
||||
@ -63,15 +53,25 @@ void io_wantread_really(int64 d,io_entry* e) {
|
||||
write(io_master,&x,sizeof(x));
|
||||
}
|
||||
#endif
|
||||
#ifdef HAVE_SIGIO
|
||||
if (io_waitmode==_SIGIO) {
|
||||
#if defined(HAVE_SIGIO) || defined(HAVE_EPOLL)
|
||||
if (io_waitmode==_SIGIO || io_waitmode==EPOLL) {
|
||||
struct pollfd p;
|
||||
p.fd=d;
|
||||
p.events=POLLIN;
|
||||
switch (poll(&p,1,0)) {
|
||||
case 1: e->canread=1; break;
|
||||
case 0: e->canread=0; break;
|
||||
case -1: return;
|
||||
if (io_waitmode==EPOLL && !e->epolladded) {
|
||||
struct epoll_event x;
|
||||
byte_zero(&x,sizeof(x)); // shut up valgrind
|
||||
x.events=EPOLLIN|EPOLLOUT|EPOLLET;
|
||||
x.data.fd=d;
|
||||
epoll_ctl(io_master,EPOLL_CTL_ADD,d,&x);
|
||||
e->epolladded=1;
|
||||
}
|
||||
if (e->canread==0) {
|
||||
p.fd=d;
|
||||
p.events=POLLIN;
|
||||
switch (poll(&p,1,0)) {
|
||||
case 1: e->canread=1; break;
|
||||
// case 0: e->canread=0; break;
|
||||
case -1: return;
|
||||
}
|
||||
}
|
||||
if (e->canread) {
|
||||
debug_printf(("io_wantread: enqueueing %lld in normal read queue (next is %ld)\n",d,first_readable));
|
||||
@ -113,7 +113,7 @@ queueread:
|
||||
}
|
||||
|
||||
void io_wantread(int64 d) {
|
||||
io_entry* e=array_get(&io_fds,sizeof(io_entry),d);
|
||||
io_entry* e=iarray_get(&io_fds,d);
|
||||
if (!e || e->wantread) return;
|
||||
if (e->canread) {
|
||||
e->next_read=first_readable;
|
||||
|
@ -40,16 +40,6 @@ void io_wantwrite_really(int64 d, io_entry* e) {
|
||||
assert(!e->kernelwantwrite); /* we should not be here if we already told the kernel we want to write */
|
||||
newfd=(!e->kernelwantread);
|
||||
io_wanted_fds+=newfd;
|
||||
#ifdef HAVE_EPOLL
|
||||
if (io_waitmode==EPOLL) {
|
||||
struct epoll_event x;
|
||||
byte_zero(&x,sizeof(x)); // to shut up valgrind
|
||||
x.events=EPOLLOUT;
|
||||
if (e->kernelwantread) x.events|=EPOLLIN;
|
||||
x.data.fd=d;
|
||||
epoll_ctl(io_master,e->kernelwantread?EPOLL_CTL_MOD:EPOLL_CTL_ADD,d,&x);
|
||||
}
|
||||
#endif
|
||||
#ifdef HAVE_KQUEUE
|
||||
if (io_waitmode==KQUEUE) {
|
||||
struct kevent kev;
|
||||
@ -68,15 +58,25 @@ void io_wantwrite_really(int64 d, io_entry* e) {
|
||||
write(io_master,&x,sizeof(x));
|
||||
}
|
||||
#endif
|
||||
#ifdef HAVE_SIGIO
|
||||
if (io_waitmode==_SIGIO) {
|
||||
#if defined(HAVE_SIGIO) || defined(HAVE_EPOLL)
|
||||
if (io_waitmode==_SIGIO || io_waitmode==EPOLL) {
|
||||
struct pollfd p;
|
||||
p.fd=d;
|
||||
p.events=POLLOUT;
|
||||
switch (poll(&p,1,0)) {
|
||||
case 1: e->canwrite=1; break;
|
||||
case 0: e->canwrite=0; break;
|
||||
case -1: return;
|
||||
if (io_waitmode==EPOLL && !e->epolladded) {
|
||||
struct epoll_event x;
|
||||
byte_zero(&x,sizeof(x)); // shut up valgrind
|
||||
x.events=EPOLLIN|EPOLLOUT|EPOLLET;
|
||||
x.data.fd=d;
|
||||
epoll_ctl(io_master,EPOLL_CTL_ADD,d,&x);
|
||||
e->epolladded=1;
|
||||
}
|
||||
if (e->canwrite==0) {
|
||||
p.fd=d;
|
||||
p.events=POLLOUT;
|
||||
switch (poll(&p,1,0)) {
|
||||
case 1: e->canwrite=1; break;
|
||||
// case 0: e->canwrite=0; break;
|
||||
case -1: return;
|
||||
}
|
||||
}
|
||||
if (e->canwrite) {
|
||||
debug_printf(("io_wantwrite: enqueueing %lld in normal write queue before %ld\n",d,first_readable));
|
||||
@ -99,7 +99,7 @@ void io_wantwrite_really(int64 d, io_entry* e) {
|
||||
}
|
||||
|
||||
void io_wantwrite(int64 d) {
|
||||
io_entry* e=array_get(&io_fds,sizeof(io_entry),d);
|
||||
io_entry* e=iarray_get(&io_fds,d);
|
||||
if (!e) return;
|
||||
if (e->wantwrite && e->kernelwantwrite) return;
|
||||
if (e->canwrite) {
|
||||
|
@ -17,7 +17,7 @@ int64 iob_send(int64 s,io_batch* b) {
|
||||
|
||||
if (b->bytesleft==0) return 0;
|
||||
sent=-1;
|
||||
e=array_get(&io_fds,sizeof(io_entry),s);
|
||||
e=iarray_get(&io_fds,s);
|
||||
if (!e) { errno=EBADF; return -3; }
|
||||
if (!(x=array_get(&b->b,sizeof(iob_entry),b->next)))
|
||||
return -3; /* can't happen error */
|
||||
|
@ -4,6 +4,7 @@
|
||||
|
||||
#include "io.h"
|
||||
#include "array.h"
|
||||
#include "iarray.h"
|
||||
#ifdef __MINGW32__
|
||||
#include "socket.h"
|
||||
my_extern HANDLE io_comport;
|
||||
@ -36,6 +37,7 @@ my_extern HANDLE io_comport;
|
||||
|
||||
typedef struct {
|
||||
tai6464 timeout;
|
||||
int fd;
|
||||
unsigned int wantread:1; /* does the app want to read/write? */
|
||||
unsigned int wantwrite:1;
|
||||
unsigned int canread:1; /* do we know we can read/write? */
|
||||
@ -44,6 +46,7 @@ typedef struct {
|
||||
unsigned int inuse:1; /* internal consistency checking */
|
||||
unsigned int kernelwantread:1; /* did we tell the kernel we want to read/write? */
|
||||
unsigned int kernelwantwrite:1;
|
||||
unsigned int epolladded:1;
|
||||
#ifdef __MINGW32__
|
||||
unsigned int readqueued:2;
|
||||
unsigned int writequeued:2;
|
||||
@ -71,7 +74,7 @@ typedef struct {
|
||||
extern int io_multithreaded;
|
||||
extern int io_sockets[2];
|
||||
|
||||
my_extern array io_fds;
|
||||
my_extern iarray io_fds;
|
||||
my_extern uint64 io_wanted_fds;
|
||||
my_extern array io_pollfds;
|
||||
|
||||
|
1
t.c
1
t.c
@ -51,6 +51,7 @@ int main(int argc,char* argv[]) {
|
||||
char buf[1024];
|
||||
size_t l;
|
||||
unsigned char c;
|
||||
(void)writecb;
|
||||
printf("%d\n",(c=scan_fromhex('.')));
|
||||
(void)argc;
|
||||
(void)argv;
|
||||
|
@ -47,6 +47,7 @@ int main() {
|
||||
buffer_putulong(buffer_2,n);
|
||||
buffer_puts(buffer_2,")");
|
||||
if (io_fd(n)) {
|
||||
io_nonblock(n);
|
||||
io_wantread(n);
|
||||
} else {
|
||||
buffer_puts(buffer_2,", but io_fd failed.");
|
||||
|
Loading…
x
Reference in New Issue
Block a user