From a544abc39c61a2cb8346b474c13eec7572d1e796 Mon Sep 17 00:00:00 2001 From: leitner Date: Fri, 4 Apr 2014 18:11:03 +0000 Subject: [PATCH] switch io_fds from array to newly implemented (hopefully thread-safe) iarray switch epoll from level triggering to edge triggering --- CHANGES | 4 +- array/iarray_allocate.c | 6 +- buffer/buffer_mmapread.c | 2 +- iarray.h | 3 +- io/io_block.c | 2 +- io/io_canread.c | 4 +- io/io_canwrite.c | 4 +- io/io_close.c | 2 +- io/io_dontwantread.c | 13 +--- io/io_dontwantwrite.c | 13 +--- io/io_eagain.c | 2 +- io/io_fd.c | 21 +++++- io/io_finishandshutdown.c | 2 +- io/io_getcookie.c | 2 +- io/io_mmapwritefile.c | 2 +- io/io_nonblock.c | 2 +- io/io_sendfile.c | 4 +- io/io_setcookie.c | 2 +- io/io_timeout.c | 2 +- io/io_timeouted.c | 6 +- io/io_tryread.c | 4 +- io/io_tryreadtimeout.c | 2 +- io/io_trywrite.c | 4 +- io/io_trywritetimeout.c | 2 +- io/io_waitread.c | 4 +- io/io_waituntil2.c | 134 ++++++++++++-------------------------- io/io_waitwrite.c | 4 +- io/io_wantread.c | 38 +++++------ io/io_wantwrite.c | 38 +++++------ io/iob_send.c | 2 +- io_internal.h | 5 +- t.c | 1 + test/io5.c | 1 + 33 files changed, 147 insertions(+), 190 deletions(-) diff --git a/CHANGES b/CHANGES index 43ec845..c91ff1c 100644 --- a/CHANGES +++ b/CHANGES @@ -6,12 +6,14 @@ change buffer to have a destructor function pointer SECURITY: fmt_strn would write one byte too many (returned right length though, so usually not a problem as that byte would be overwritten with \0 by the caller anyway) fmt_pad and fmt_fill fail more gracefully when srclen > maxlen - You can now say $ make WERROR=-Werror (compiling t.c will fail but that's not part of the library) + You can now say $ make WERROR=-Werror (compiling t.c may fail but that's strictly speaking not part of the library) scan_html now decodes HTML entities based on the actual w3c list (from entities.json, say gmake update to fetch the current version) added fmt_escapechar* to fmt.h (implement various escaping mechanisms also found in textcode but for a single char not a whole string, and they always escape, not just when they think it's needed) scan_ushort was supposed to abort early and return 5 when attempting to parse "65536", because the result does not fit. It did not. Now it does. scan_*long, scan_*int, scan_*short now properly abort if the number would not fit SECURITY: check for integer overflow in stralloc_ready + switch io_fds from array to newly implemented (hopefully thread-safe) iarray + switch epoll from level triggering to edge triggering 0.29: save 8 bytes in taia.h for 64-bit systems diff --git a/array/iarray_allocate.c b/array/iarray_allocate.c index 5e5dc1a..9618820 100644 --- a/array/iarray_allocate.c +++ b/array/iarray_allocate.c @@ -17,7 +17,7 @@ static iarray_page* new_page(size_t pagesize) { } void* iarray_allocate(iarray* ia,size_t pos) { - size_t index; + size_t index,prevlen=ia->len; iarray_page** p=&ia->pages[pos%(sizeof(ia->pages)/sizeof(ia->pages[0]))]; iarray_page* newpage=0; for (index=0; poselemperpage; index+=ia->elemperpage) { @@ -28,7 +28,11 @@ void* iarray_allocate(iarray* ia,size_t pos) { newpage=0; } if (index+ia->elemperpage>pos) { + size_t l; if (newpage) munmap(newpage,ia->bytesperpage); + do { + l=__CAS(&ia->len,prevlen,pos); + } while (ldata[(pos-index)*ia->elemsize]; } p=&(*p)->next; diff --git a/buffer/buffer_mmapread.c b/buffer/buffer_mmapread.c index e9239f8..9a73f6f 100644 --- a/buffer/buffer_mmapread.c +++ b/buffer/buffer_mmapread.c @@ -6,7 +6,7 @@ static ssize_t op() { } int buffer_mmapread(buffer* b,const char* filename) { - if (!(b->x=mmap_read(filename,&b->n))) return -1; + if (!(b->x=(char*)mmap_read(filename,&b->n))) return -1; b->p=0; b->a=b->n; b->fd=-1; b->op=op; diff --git a/iarray.h b/iarray.h index 22e11ef..6a6a35e 100644 --- a/iarray.h +++ b/iarray.h @@ -24,12 +24,13 @@ typedef struct _iarray_page { typedef struct { iarray_page* pages[16]; - size_t elemsize,elemperpage,bytesperpage; + size_t elemsize,elemperpage,bytesperpage,len; } iarray; void iarray_init(iarray* ia,size_t elemsize); void* iarray_get(iarray* ia,size_t pos); void* iarray_allocate(iarray* ia,size_t pos); +size_t iarray_length(iarray* ia); /* WARNING: do not use the array during or after iarray_free, make sure * no threads are potentially doing anything with the iarray while it is diff --git a/io/io_block.c b/io/io_block.c index 1deb265..3ccfb11 100644 --- a/io/io_block.c +++ b/io/io_block.c @@ -13,7 +13,7 @@ #endif void io_block(int64 d) { - io_entry* e=array_get(&io_fds,sizeof(io_entry),d); + io_entry* e=iarray_get(&io_fds,d); #ifdef __MINGW32__ unsigned long i=0; if (ioctlsocket( d, FIONBIO, &i)==0) diff --git a/io/io_canread.c b/io/io_canread.c index 8a02ed0..a6c8fa5 100644 --- a/io/io_canread.c +++ b/io/io_canread.c @@ -14,7 +14,7 @@ int64 io_canread() { if (first_readable==-1) #ifdef HAVE_SIGIO { - if (alt_firstread>=0 && (e=array_get(&io_fds,sizeof(io_entry),alt_firstread)) && e->canread) { + if (alt_firstread>=0 && (e=iarray_get(&io_fds,alt_firstread)) && e->canread) { debug_printf(("io_canread: normal read queue is empty, swapping in alt read queue (starting with %ld)\n",alt_firstread)); first_readable=alt_firstread; alt_firstread=-1; @@ -26,7 +26,7 @@ int64 io_canread() { #endif for (;;) { int64 r; - e=array_get(&io_fds,sizeof(io_entry),first_readable); + e=iarray_get(&io_fds,first_readable); if (!e) break; r=first_readable; first_readable=e->next_read; diff --git a/io/io_canwrite.c b/io/io_canwrite.c index beecade..0495e5f 100644 --- a/io/io_canwrite.c +++ b/io/io_canwrite.c @@ -10,7 +10,7 @@ int64 io_canwrite() { if (first_writeable==-1) #ifdef HAVE_SIGIO { - if (alt_firstwrite>=0 && (e=array_get(&io_fds,sizeof(io_entry),alt_firstwrite)) && e->canwrite) { + if (alt_firstwrite>=0 && (e=iarray_get(&io_fds,alt_firstwrite)) && e->canwrite) { debug_printf(("io_canwrite: normal write queue is empty, swapping in alt write queue (starting with %ld)\n",alt_firstwrite)); first_writeable=alt_firstwrite; alt_firstwrite=-1; @@ -22,7 +22,7 @@ int64 io_canwrite() { #endif for (;;) { int64 r; - e=array_get(&io_fds,sizeof(io_entry),first_writeable); + e=iarray_get(&io_fds,first_writeable); if (!e) break; r=first_writeable; first_writeable=e->next_write; diff --git a/io/io_close.c b/io/io_close.c index b5b6d04..50fa62d 100644 --- a/io/io_close.c +++ b/io/io_close.c @@ -12,7 +12,7 @@ extern void io_dontwantwrite_really(int64 d,io_entry* e); void io_close(int64 d) { io_entry* e; - if ((e=array_get(&io_fds,sizeof(io_entry),d))) { + if ((e=iarray_get(&io_fds,d))) { e->inuse=0; e->cookie=0; if (e->kernelwantread) io_dontwantread_really(d,e); diff --git a/io/io_dontwantread.c b/io/io_dontwantread.c index 51e1898..bb16515 100644 --- a/io/io_dontwantread.c +++ b/io/io_dontwantread.c @@ -26,19 +26,10 @@ void io_dontwantread_really(int64 d, io_entry* e) { int newfd; + (void)d; assert(e->kernelwantread); newfd=!e->kernelwantwrite; io_wanted_fds-=newfd; -#ifdef HAVE_EPOLL - if (io_waitmode==EPOLL) { - struct epoll_event x; - byte_zero(&x,sizeof(x)); // to shut up valgrind - x.events=0; - if (e->kernelwantwrite) x.events|=EPOLLOUT; - x.data.fd=d; - epoll_ctl(io_master,e->kernelwantwrite?EPOLL_CTL_MOD:EPOLL_CTL_DEL,d,&x); - } -#endif #ifdef HAVE_KQUEUE if (io_waitmode==KQUEUE) { struct kevent kev; @@ -63,7 +54,7 @@ void io_dontwantread_really(int64 d, io_entry* e) { } void io_dontwantread(int64 d) { - io_entry* e=array_get(&io_fds,sizeof(io_entry),d); + io_entry* e=iarray_get(&io_fds,d); if (e) { if (e->canread) io_dontwantread_really(d,e); diff --git a/io/io_dontwantwrite.c b/io/io_dontwantwrite.c index abbbbbe..37008d6 100644 --- a/io/io_dontwantwrite.c +++ b/io/io_dontwantwrite.c @@ -32,19 +32,10 @@ void io_dontwantwrite_really(int64 d,io_entry* e) { int newfd; + (void)d; assert(e->kernelwantwrite); newfd=!e->kernelwantread; io_wanted_fds-=newfd; -#ifdef HAVE_EPOLL - if (io_waitmode==EPOLL) { - struct epoll_event x; - byte_zero(&x,sizeof(x)); // to shut up valgrind - x.events=0; - if (e->wantread) x.events|=EPOLLIN; - x.data.fd=d; - epoll_ctl(io_master,e->kernelwantread?EPOLL_CTL_MOD:EPOLL_CTL_DEL,d,&x); - } -#endif #ifdef HAVE_KQUEUE if (io_waitmode==KQUEUE) { struct kevent kev; @@ -69,7 +60,7 @@ void io_dontwantwrite_really(int64 d,io_entry* e) { } void io_dontwantwrite(int64 d) { - io_entry* e=array_get(&io_fds,sizeof(io_entry),d); + io_entry* e=iarray_get(&io_fds,d); if (e) { if (e->canwrite) io_dontwantwrite_really(d,e); diff --git a/io/io_eagain.c b/io/io_eagain.c index f4da953..2bc7252 100644 --- a/io/io_eagain.c +++ b/io/io_eagain.c @@ -1,7 +1,7 @@ #include "io_internal.h" void io_eagain(int64 d) { - io_entry* e=array_get(&io_fds,sizeof(io_entry),d); + io_entry* e=iarray_get(&io_fds,d); if (e) { if (e->wantread) e->canread=0; if (e->wantwrite) e->canwrite=0; diff --git a/io/io_fd.c b/io/io_fd.c index 5d6c73d..da7b5c8 100644 --- a/io/io_fd.c +++ b/io/io_fd.c @@ -25,11 +25,18 @@ #include #endif +#ifdef __dietlibc__ +#include +#else +#define __CAS(ptr,oldval,newval) __sync_val_compare_and_swap(ptr,oldval,newval) +#endif + #ifdef __MINGW32__ #include extern HANDLE io_comport; #endif -array io_fds; +iarray io_fds; +static int io_fds_inited; uint64 io_wanted_fds; array io_pollfds; enum __io_waitmode io_waitmode; @@ -52,7 +59,17 @@ static io_entry* io_fd_internal(int64 d) { if ((r=fcntl(d,F_GETFL,0)) == -1) return 0; /* file descriptor not open */ #endif - if (!(e=array_allocate(&io_fds,sizeof(io_entry),d))) return 0; + /* Problem: we might be the first to use io_fds. We need to make sure + * we are the only ones to initialize it. So set io_fds_inited to 2 + * and not to 1. We know we are done when it is 1. We know we need + * to do something when it is 0. We know somebody else is doing it + * when it is 2. */ + if (__CAS(&io_fds_inited,0,2)==0) { + iarray_init(&io_fds,sizeof(io_entry)); + io_fds_inited=1; + } else + do { asm("" : : : "memory"); } while (io_fds_inited!=1); + if (!(e=iarray_allocate(&io_fds,d))) return 0; if (e->inuse) return e; byte_zero(e,sizeof(io_entry)); e->inuse=1; diff --git a/io/io_finishandshutdown.c b/io/io_finishandshutdown.c index 7f1f1ae..65e6a85 100644 --- a/io/io_finishandshutdown.c +++ b/io/io_finishandshutdown.c @@ -1,6 +1,6 @@ #include "io_internal.h" void io_finishandshutdown(void) { - array_reset(&io_fds); + iarray_free(&io_fds); array_reset(&io_pollfds); } diff --git a/io/io_getcookie.c b/io/io_getcookie.c index 40f55d6..101c4fd 100644 --- a/io/io_getcookie.c +++ b/io/io_getcookie.c @@ -3,6 +3,6 @@ void* io_getcookie(int64 d) { io_entry* e; - e=array_get(&io_fds,sizeof(io_entry),d); + e=iarray_get(&io_fds,d); return e?e->cookie:0; } diff --git a/io/io_mmapwritefile.c b/io/io_mmapwritefile.c index 20e81c7..653818d 100644 --- a/io/io_mmapwritefile.c +++ b/io/io_mmapwritefile.c @@ -15,7 +15,7 @@ int64 io_mmapwritefile(int64 out,int64 in,uint64 off,uint64 bytes,io_write_callb char buf[BUFSIZE]; int n,m; uint64 sent=0; - io_entry* e=array_get(&io_fds,sizeof(io_entry),out); + io_entry* e=iarray_get(&io_fds,out); if (e) { const char* c; unsigned long left; diff --git a/io/io_nonblock.c b/io/io_nonblock.c index faf6ff4..fa41a72 100644 --- a/io/io_nonblock.c +++ b/io/io_nonblock.c @@ -13,7 +13,7 @@ #endif void io_nonblock(int64 d) { - io_entry* e=array_get(&io_fds,sizeof(io_entry),d); + io_entry* e=iarray_get(&io_fds,d); #ifdef __MINGW32__ unsigned long i=1; if (ioctlsocket( d, FIONBIO, &i)==0) diff --git a/io/io_sendfile.c b/io/io_sendfile.c index 9e538b8..ef704c9 100644 --- a/io/io_sendfile.c +++ b/io/io_sendfile.c @@ -86,7 +86,7 @@ _syscall4(int,sendfile,int,out,int,in,long *,offset,unsigned long,count) int64 io_sendfile(int64 s,int64 fd,uint64 off,uint64 n) { off_t o=off; - io_entry* e=array_get(&io_fds,sizeof(io_entry),s); + io_entry* e=iarray_get(&io_fds,s); off_t i; uint64 done=0; /* What a spectacularly broken design for sendfile64. @@ -118,7 +118,7 @@ int64 io_sendfile(int64 s,int64 fd,uint64 off,uint64 n) { #include int64 io_sendfile(int64 out,int64 in,uint64 off,uint64 bytes) { - io_entry* e=array_get(&io_fds,sizeof(io_entry),out); + io_entry* e=iarray_get(&io_fds,out); if (!e) { errno=EBADF; return -3; } if (e->sendfilequeued==1) { /* we called TransmitFile, and it returned. */ diff --git a/io/io_setcookie.c b/io/io_setcookie.c index 46cb6d7..25fa614 100644 --- a/io/io_setcookie.c +++ b/io/io_setcookie.c @@ -4,6 +4,6 @@ void io_setcookie(int64 d,void* cookie) { io_entry* e; - if ((e=array_get(&io_fds,sizeof(io_entry),d))) + if ((e=iarray_get(&io_fds,d))) e->cookie=cookie; } diff --git a/io/io_timeout.c b/io/io_timeout.c index b1415f8..0f1ba82 100644 --- a/io/io_timeout.c +++ b/io/io_timeout.c @@ -1,7 +1,7 @@ #include "io_internal.h" void io_timeout(int64 d,tai6464 t) { - io_entry* e=array_get(&io_fds,sizeof(io_entry),d); + io_entry* e=iarray_get(&io_fds,d); if (e) e->timeout=t; } diff --git a/io/io_timeouted.c b/io/io_timeouted.c index 1028d11..9cdb4ea 100644 --- a/io/io_timeouted.c +++ b/io/io_timeouted.c @@ -2,13 +2,13 @@ int64 io_timeouted() { tai6464 now; - static long ptr; + static size_t ptr; io_entry* e; - long alen=array_length(&io_fds,sizeof(io_entry)); + size_t alen=iarray_length(&io_fds); taia_now(&now); ++ptr; if (ptr>=alen) ptr=0; - e=array_get(&io_fds,sizeof(io_entry),ptr); + e=iarray_get(&io_fds,ptr); if (!e) return -1; for (;ptrinuse && e->timeout.sec.x && taia_less(&e->timeout,&now)) { diff --git a/io/io_tryread.c b/io/io_tryread.c index 186e9a4..82c823d 100644 --- a/io/io_tryread.c +++ b/io/io_tryread.c @@ -20,7 +20,7 @@ * we are called. */ int64 io_tryread(int64 d,char* buf,int64 len) { - io_entry* e=array_get(&io_fds,sizeof(io_entry),d); + io_entry* e=iarray_get(&io_fds,d); if (!e) { errno=EBADF; return -3; } if (len<0) { errno=EINVAL; return -3; } if (e->readqueued==2) { @@ -82,7 +82,7 @@ int64 io_tryread(int64 d,char* buf,int64 len) { long r; struct itimerval old,new; struct pollfd p; - io_entry* e=array_get(&io_fds,sizeof(io_entry),d); + io_entry* e=iarray_get(&io_fds,d); if (!e) { errno=EBADF; return -3; } if (!e->nonblock) { p.fd=d; diff --git a/io/io_tryreadtimeout.c b/io/io_tryreadtimeout.c index 3d0876f..4b2a613 100644 --- a/io/io_tryreadtimeout.c +++ b/io/io_tryreadtimeout.c @@ -5,7 +5,7 @@ int64 io_tryreadtimeout(int64 d,char* buf,int64 len) { int64 r=io_tryread(d,buf,len); if (r==-1) { tai6464 x; - io_entry* e=array_get(&io_fds,sizeof(io_entry),d); + io_entry* e=iarray_get(&io_fds,d); taia_now(&x); if (!taia_less(&x,&e->timeout)) { errno=ETIMEDOUT; diff --git a/io/io_trywrite.c b/io/io_trywrite.c index 53f3ad1..b593eec 100644 --- a/io/io_trywrite.c +++ b/io/io_trywrite.c @@ -19,7 +19,7 @@ * stuff on I/O batches. */ int64 io_trywrite(int64 d,const char* buf,int64 len) { - io_entry* e=array_get(&io_fds,sizeof(io_entry),d); + io_entry* e=iarray_get(&io_fds,d); int r; if (!e) { errno=EBADF; return -3; } if (!e->nonblock) { @@ -75,7 +75,7 @@ int64 io_trywrite(int64 d,const char* buf,int64 len) { long r; struct itimerval old,new; struct pollfd p; - io_entry* e=array_get(&io_fds,sizeof(io_entry),d); + io_entry* e=iarray_get(&io_fds,d); io_sigpipe(); if (!e) { errno=EBADF; return -3; } if (!e->nonblock) { diff --git a/io/io_trywritetimeout.c b/io/io_trywritetimeout.c index 5c81455..4e0e08e 100644 --- a/io/io_trywritetimeout.c +++ b/io/io_trywritetimeout.c @@ -5,7 +5,7 @@ int64 io_trywritetimeout(int64 d,const char* buf,int64 len) { int64 r=io_trywrite(d,buf,len); if (r==-1) { tai6464 x; - io_entry* e=array_get(&io_fds,sizeof(io_entry),d); + io_entry* e=iarray_get(&io_fds,d); taia_now(&x); if (!taia_less(&x,&e->timeout)) { errno=ETIMEDOUT; diff --git a/io/io_waitread.c b/io/io_waitread.c index 8969d2f..235f111 100644 --- a/io/io_waitread.c +++ b/io/io_waitread.c @@ -11,7 +11,7 @@ int64 io_waitread(int64 d,char* buf,int64 len) { long r; - io_entry* e=array_get(&io_fds,sizeof(io_entry),d); + io_entry* e=iarray_get(&io_fds,d); if (!e) { errno=EBADF; return -3; } if (e->nonblock) { unsigned long i=0; @@ -32,7 +32,7 @@ int64 io_waitread(int64 d,char* buf,int64 len) { int64 io_waitread(int64 d,char* buf,int64 len) { long r; struct pollfd p; - io_entry* e=array_get(&io_fds,sizeof(io_entry),d); + io_entry* e=iarray_get(&io_fds,d); if (!e) { errno=EBADF; return -3; } if (e->nonblock) { again: diff --git a/io/io_waituntil2.c b/io/io_waituntil2.c index 313cb1d..050ecc6 100644 --- a/io/io_waituntil2.c +++ b/io/io_waituntil2.c @@ -28,6 +28,11 @@ #include #endif +#ifdef __dietlibc__ +#include +#include +#endif + #ifdef DEBUG #include #endif @@ -119,97 +124,38 @@ int64 io_waituntil2(int64 milliseconds) { if (io_waitmode==EPOLL) { int n; struct epoll_event y[100]; + io_entry* e; + if (alt_firstread>=0 && (e=iarray_get(&io_fds,alt_firstread)) && e->canread) return 1; + if (alt_firstwrite>=0 && (e=iarray_get(&io_fds,alt_firstwrite)) && e->canwrite) return 1; if ((n=epoll_wait(io_master,y,100,milliseconds))==-1) return -1; - for (i=n-1; i>=0; --i) { - io_entry* e=array_get(&io_fds,sizeof(io_entry),y[i].data.fd); + for (i=0; ikernelwantread) curevents |= EPOLLIN; - if (e->kernelwantwrite) curevents |= EPOLLOUT; - -#ifdef DEBUG - if ((y[i].events&(EPOLLIN|EPOLLPRI|EPOLLRDNORM|EPOLLRDBAND)) && !e->kernelwantread) - printf("got unexpected read event on fd #%d\n",y[i].data.fd); - if ((y[i].events&EPOLLOUT) && !e->kernelwantwrite) - printf("got unexpected write event on fd #%d\n",y[i].data.fd); -#endif - - if (y[i].events&(EPOLLERR|EPOLLHUP)) { + if (y[i].events&(POLLERR|POLLHUP)) { /* error; signal whatever app is looking for */ - if (e->wantread) y[i].events|=EPOLLIN; - if (e->wantwrite) y[i].events|=EPOLLOUT; - } - - newevents=0; - if (!e->canread || e->wantread) { - newevents|=EPOLLIN; - e->kernelwantread=1; - } else - e->kernelwantread=0; - if (!e->canwrite || e->wantwrite) { - newevents|=EPOLLOUT; - e->kernelwantwrite=1; - } else - e->kernelwantwrite=0; - - /* if we think we can not read, but the kernel tells us that we - * can, put this fd in the relevant data structures */ - if (!e->canread && (y[i].events&(EPOLLIN|EPOLLPRI|EPOLLRDNORM|EPOLLRDBAND))) { - if (e->canread) { - newevents &= ~EPOLLIN; - } else { - e->canread=1; - if (e->wantread) { - e->next_read=first_readable; - first_readable=y[i].data.fd; - } - } + if (e->wantread) y[i].events|=POLLIN; + if (e->wantwrite) y[i].events|=POLLOUT; } - /* if the kernel says the fd is writable, ... */ - if (y[i].events&EPOLLOUT) { - /* Usually, if the kernel says a descriptor is writable, we - * note it and do not tell the kernel not to tell us again. - * The idea is that once we notify the caller that the fd is - * writable, and the caller handles the event, the caller will - * just ask to be notified of future write events again. We - * are trying to save the superfluous epoll_ctl syscalls. - * If e->canwrite is set, then this gamble did not work out. - * We told the caller, yet after the caller is done we still - * got another write event. Clearly the user is implementing - * some kind of throttling and we can tell the kernel to leave - * us alone for now. */ - if (e->canwrite) { - newevents &= ~EPOLLOUT; - e->kernelwantwrite=0; - } else { - /* If !e->wantwrite: The laziness optimization in - * io_dontwantwrite hit. We did not tell the kernel that we - * are no longer interested in writing to save the syscall. - * Now we know we could write if we wanted; remember that - * and then go on. */ - e->canwrite=1; - if (e->wantwrite) { - e->next_write=first_writeable; - first_writeable=y[i].data.fd; - } - } + if (y[i].events&POLLIN && !e->canread) { + debug_printf(("io_waituntil2: enqueueing %ld in normal read queue before %ld\n",info.si_fd,first_readable)); + e->canread=1; + e->next_read=first_readable; + first_readable=y[i].data.fd; } - - if (newevents != curevents) { -#if 0 - printf("canread %d, wantread %d, kernelwantread %d, canwrite %d, wantwrite %d, kernelwantwrite %d\n", - e->canread,e->wantread,e->kernelwantread,e->canwrite,e->wantwrite,e->kernelwantwrite); - printf("newevents: read %d write %d\n",!!(newevents&EPOLLIN),!!(newevents&EPOLLOUT)); -#endif - y[i].events=newevents; - if (newevents) { - epoll_ctl(io_master,EPOLL_CTL_MOD,y[i].data.fd,y+i); - } else { - epoll_ctl(io_master,EPOLL_CTL_DEL,y[i].data.fd,y+i); - --io_wanted_fds; - } + if (y[i].events&POLLOUT && !e->canwrite) { + debug_printf(("io_waituntil2: enqueueing %ld in normal write queue before %ld\n",info.si_fd,first_writeable)); + e->canwrite=1; + e->next_write=first_writeable; + first_writeable=y[i].data.fd; } } else { +#ifdef __dietlibc__ + char buf[FMT_ULONG]; + buf[fmt_ulong(buf,y[i].data.fd)]=0; + __write2("got epoll event on invalid fd "); + __write2(buf); + __write2("!\n"); +#endif epoll_ctl(io_master,EPOLL_CTL_DEL,y[i].data.fd,y+i); } } @@ -224,7 +170,7 @@ int64 io_waituntil2(int64 milliseconds) { ts.tv_sec=milliseconds/1000; ts.tv_nsec=(milliseconds%1000)*1000000; if ((n=kevent(io_master,0,0,y,100,milliseconds!=-1?&ts:0))==-1) return -1; for (i=n-1; i>=0; --i) { - io_entry* e=array_get(&io_fds,sizeof(io_entry),y[--n].ident); + io_entry* e=iarray_get(&io_fds,y[--n].ident); if (e) { if (y[n].flags&EV_ERROR) { /* error; signal whatever app is looking for */ @@ -260,7 +206,7 @@ int64 io_waituntil2(int64 milliseconds) { timeout.dp_fds=y; if ((n=ioctl(io_master,DP_POLL,&timeout))==-1) return -1; for (i=n-1; i>=0; --i) { - io_entry* e=array_get(&io_fds,sizeof(io_entry),y[--n].fd); + io_entry* e=iarray_get(&io_fds,y[--n].fd); if (e) { if (y[n].revents&(POLLERR|POLLHUP|POLLNVAL)) { /* error; signal whatever app is looking for */ @@ -296,8 +242,8 @@ int64 io_waituntil2(int64 milliseconds) { struct timespec ts; int r; io_entry* e; - if (alt_firstread>=0 && (e=array_get(&io_fds,sizeof(io_entry),alt_firstread)) && e->canread) return 1; - if (alt_firstwrite>=0 && (e=array_get(&io_fds,sizeof(io_entry),alt_firstwrite)) && e->canwrite) return 1; + if (alt_firstread>=0 && (e=iarray_get(&io_fds,alt_firstread)) && e->canread) return 1; + if (alt_firstwrite>=0 && (e=iarray_get(&io_fds,alt_firstwrite)) && e->canwrite) return 1; if (milliseconds==-1) r=sigwaitinfo(&io_ss,&info); else { @@ -311,7 +257,7 @@ int64 io_waituntil2(int64 milliseconds) { goto dopoll; default: if (r==io_signum) { - io_entry* e=array_get(&io_fds,sizeof(io_entry),info.si_fd); + io_entry* e=iarray_get(&io_fds,info.si_fd); if (e) { if (info.si_band&(POLLERR|POLLHUP)) { /* error; signal whatever app is looking for */ @@ -351,7 +297,7 @@ dopoll: } fprintf(stderr,"Calling GetQueuedCompletionStatus %p...",io_comport); if (GetQueuedCompletionStatus(io_comport,&numberofbytes,&x,&o,milliseconds==-1?INFINITE:milliseconds)) { - io_entry* e=array_get(&io_fds,sizeof(io_entry),x); + io_entry* e=iarray_get(&io_fds,x); fprintf(stderr," OK. Got %x, e=%p\n",x,e); if (!e) return 0; e->errorcode=0; @@ -398,7 +344,7 @@ dopoll: /* we got a completion packet for a failed I/O operation */ err=GetLastError(); if (err==WAIT_TIMEOUT) return 0; /* or maybe not */ - e=array_get(&io_fds,sizeof(io_entry),x); + e=iarray_get(&io_fds,x); if (!e) return 0; /* WTF?! */ e->errorcode=err; if (o==&e->or && (e->readqueued || e->acceptqueued)) { @@ -422,8 +368,8 @@ dopoll: return 1; } #else - for (i=r=0; icanread=e->canwrite=0; if (e->wantread || e->wantwrite) { @@ -439,7 +385,7 @@ dopoll: p=array_start(&io_pollfds); if ((i=poll(array_start(&io_pollfds),r,milliseconds))<1) return -1; for (j=r-1; j>=0; --j) { - io_entry* e=array_get(&io_fds,sizeof(io_entry),p->fd); + io_entry* e=iarray_get(&io_fds,p->fd); if (p->revents&(POLLERR|POLLHUP|POLLNVAL)) { /* error; signal whatever app is looking for */ if (e->wantread) p->revents|=POLLIN; diff --git a/io/io_waitwrite.c b/io/io_waitwrite.c index 0a87b42..e340160 100644 --- a/io/io_waitwrite.c +++ b/io/io_waitwrite.c @@ -11,7 +11,7 @@ int64 io_waitwrite(int64 d,const char* buf,int64 len) { long r; - io_entry* e=array_get(&io_fds,sizeof(io_entry),d); + io_entry* e=iarray_get(&io_fds,d); if (!e) { errno=EBADF; return -3; } if (e->nonblock) { unsigned long i=0; @@ -32,7 +32,7 @@ int64 io_waitwrite(int64 d,const char* buf,int64 len) { int64 io_waitwrite(int64 d,const char* buf,int64 len) { long r; struct pollfd p; - io_entry* e=array_get(&io_fds,sizeof(io_entry),d); + io_entry* e=iarray_get(&io_fds,d); io_sigpipe(); if (!e) { errno=EBADF; return -3; } if (e->nonblock) { diff --git a/io/io_wantread.c b/io/io_wantread.c index 3fb9965..553b2b5 100644 --- a/io/io_wantread.c +++ b/io/io_wantread.c @@ -35,16 +35,6 @@ void io_wantread_really(int64 d,io_entry* e) { assert(!e->kernelwantread); newfd=!e->kernelwantwrite; io_wanted_fds+=newfd; -#ifdef HAVE_EPOLL - if (io_waitmode==EPOLL) { - struct epoll_event x; - byte_zero(&x,sizeof(x)); // to shut up valgrind - x.events=EPOLLIN; - if (e->kernelwantwrite) x.events|=EPOLLOUT; - x.data.fd=d; - epoll_ctl(io_master,e->kernelwantwrite?EPOLL_CTL_MOD:EPOLL_CTL_ADD,d,&x); - } -#endif #ifdef HAVE_KQUEUE if (io_waitmode==KQUEUE) { struct kevent kev; @@ -63,15 +53,25 @@ void io_wantread_really(int64 d,io_entry* e) { write(io_master,&x,sizeof(x)); } #endif -#ifdef HAVE_SIGIO - if (io_waitmode==_SIGIO) { +#if defined(HAVE_SIGIO) || defined(HAVE_EPOLL) + if (io_waitmode==_SIGIO || io_waitmode==EPOLL) { struct pollfd p; - p.fd=d; - p.events=POLLIN; - switch (poll(&p,1,0)) { - case 1: e->canread=1; break; - case 0: e->canread=0; break; - case -1: return; + if (io_waitmode==EPOLL && !e->epolladded) { + struct epoll_event x; + byte_zero(&x,sizeof(x)); // shut up valgrind + x.events=EPOLLIN|EPOLLOUT|EPOLLET; + x.data.fd=d; + epoll_ctl(io_master,EPOLL_CTL_ADD,d,&x); + e->epolladded=1; + } + if (e->canread==0) { + p.fd=d; + p.events=POLLIN; + switch (poll(&p,1,0)) { + case 1: e->canread=1; break; +// case 0: e->canread=0; break; + case -1: return; + } } if (e->canread) { debug_printf(("io_wantread: enqueueing %lld in normal read queue (next is %ld)\n",d,first_readable)); @@ -113,7 +113,7 @@ queueread: } void io_wantread(int64 d) { - io_entry* e=array_get(&io_fds,sizeof(io_entry),d); + io_entry* e=iarray_get(&io_fds,d); if (!e || e->wantread) return; if (e->canread) { e->next_read=first_readable; diff --git a/io/io_wantwrite.c b/io/io_wantwrite.c index 10d97ab..3bf2bff 100644 --- a/io/io_wantwrite.c +++ b/io/io_wantwrite.c @@ -40,16 +40,6 @@ void io_wantwrite_really(int64 d, io_entry* e) { assert(!e->kernelwantwrite); /* we should not be here if we already told the kernel we want to write */ newfd=(!e->kernelwantread); io_wanted_fds+=newfd; -#ifdef HAVE_EPOLL - if (io_waitmode==EPOLL) { - struct epoll_event x; - byte_zero(&x,sizeof(x)); // to shut up valgrind - x.events=EPOLLOUT; - if (e->kernelwantread) x.events|=EPOLLIN; - x.data.fd=d; - epoll_ctl(io_master,e->kernelwantread?EPOLL_CTL_MOD:EPOLL_CTL_ADD,d,&x); - } -#endif #ifdef HAVE_KQUEUE if (io_waitmode==KQUEUE) { struct kevent kev; @@ -68,15 +58,25 @@ void io_wantwrite_really(int64 d, io_entry* e) { write(io_master,&x,sizeof(x)); } #endif -#ifdef HAVE_SIGIO - if (io_waitmode==_SIGIO) { +#if defined(HAVE_SIGIO) || defined(HAVE_EPOLL) + if (io_waitmode==_SIGIO || io_waitmode==EPOLL) { struct pollfd p; - p.fd=d; - p.events=POLLOUT; - switch (poll(&p,1,0)) { - case 1: e->canwrite=1; break; - case 0: e->canwrite=0; break; - case -1: return; + if (io_waitmode==EPOLL && !e->epolladded) { + struct epoll_event x; + byte_zero(&x,sizeof(x)); // shut up valgrind + x.events=EPOLLIN|EPOLLOUT|EPOLLET; + x.data.fd=d; + epoll_ctl(io_master,EPOLL_CTL_ADD,d,&x); + e->epolladded=1; + } + if (e->canwrite==0) { + p.fd=d; + p.events=POLLOUT; + switch (poll(&p,1,0)) { + case 1: e->canwrite=1; break; +// case 0: e->canwrite=0; break; + case -1: return; + } } if (e->canwrite) { debug_printf(("io_wantwrite: enqueueing %lld in normal write queue before %ld\n",d,first_readable)); @@ -99,7 +99,7 @@ void io_wantwrite_really(int64 d, io_entry* e) { } void io_wantwrite(int64 d) { - io_entry* e=array_get(&io_fds,sizeof(io_entry),d); + io_entry* e=iarray_get(&io_fds,d); if (!e) return; if (e->wantwrite && e->kernelwantwrite) return; if (e->canwrite) { diff --git a/io/iob_send.c b/io/iob_send.c index 3b69bde..72ac854 100644 --- a/io/iob_send.c +++ b/io/iob_send.c @@ -17,7 +17,7 @@ int64 iob_send(int64 s,io_batch* b) { if (b->bytesleft==0) return 0; sent=-1; - e=array_get(&io_fds,sizeof(io_entry),s); + e=iarray_get(&io_fds,s); if (!e) { errno=EBADF; return -3; } if (!(x=array_get(&b->b,sizeof(iob_entry),b->next))) return -3; /* can't happen error */ diff --git a/io_internal.h b/io_internal.h index 0092cc9..70e0475 100644 --- a/io_internal.h +++ b/io_internal.h @@ -4,6 +4,7 @@ #include "io.h" #include "array.h" +#include "iarray.h" #ifdef __MINGW32__ #include "socket.h" my_extern HANDLE io_comport; @@ -36,6 +37,7 @@ my_extern HANDLE io_comport; typedef struct { tai6464 timeout; + int fd; unsigned int wantread:1; /* does the app want to read/write? */ unsigned int wantwrite:1; unsigned int canread:1; /* do we know we can read/write? */ @@ -44,6 +46,7 @@ typedef struct { unsigned int inuse:1; /* internal consistency checking */ unsigned int kernelwantread:1; /* did we tell the kernel we want to read/write? */ unsigned int kernelwantwrite:1; + unsigned int epolladded:1; #ifdef __MINGW32__ unsigned int readqueued:2; unsigned int writequeued:2; @@ -71,7 +74,7 @@ typedef struct { extern int io_multithreaded; extern int io_sockets[2]; -my_extern array io_fds; +my_extern iarray io_fds; my_extern uint64 io_wanted_fds; my_extern array io_pollfds; diff --git a/t.c b/t.c index 0903acb..ca2192e 100644 --- a/t.c +++ b/t.c @@ -51,6 +51,7 @@ int main(int argc,char* argv[]) { char buf[1024]; size_t l; unsigned char c; + (void)writecb; printf("%d\n",(c=scan_fromhex('.'))); (void)argc; (void)argv; diff --git a/test/io5.c b/test/io5.c index 163b24c..e24b1db 100644 --- a/test/io5.c +++ b/test/io5.c @@ -47,6 +47,7 @@ int main() { buffer_putulong(buffer_2,n); buffer_puts(buffer_2,")"); if (io_fd(n)) { + io_nonblock(n); io_wantread(n); } else { buffer_puts(buffer_2,", but io_fd failed.");