From 2a2e1ddc75fb7b2f2be07dba2f0c2013ee367e1a Mon Sep 17 00:00:00 2001 From: leitner Date: Fri, 12 Sep 2003 22:03:51 +0000 Subject: [PATCH] add sigio support extend code to cope with edge triggered event notification add io_eagain() so that you can notify io_wait if accept() returned EAGAIN --- CHANGES | 5 +++++ GNUmakefile | 11 ++++++++--- io.h | 4 ++++ io/io_canread.c | 32 ++++++++++++++++++++++++++----- io/io_canwrite.c | 32 ++++++++++++++++++++++++++----- io/io_eagain.c | 19 ++++++++++++++++++ io/io_fd.c | 33 +++++++++++++++++++++++++++++-- io/io_tryread.c | 6 ++++++ io/io_trywrite.c | 6 ++++++ io/io_waituntil2.c | 48 +++++++++++++++++++++++++++++++++++++++++++++- io/io_wantread.c | 22 ++++++++++++++++++++- io/io_wantwrite.c | 22 ++++++++++++++++++++- io/iob_send.c | 5 ++++- io_internal.h | 29 ++++++++++++++++++++++------ test/httpd.c | 20 ++++++++++++------- 15 files changed, 262 insertions(+), 32 deletions(-) create mode 100644 io/io_eagain.c diff --git a/CHANGES b/CHANGES index 9aeed8b..65d2bc6 100644 --- a/CHANGES +++ b/CHANGES @@ -1,3 +1,8 @@ +0.17: + add Linux SIGIO support to IO + expand IO api to be able to cope with edge triggered event + notification: introduce io_eagain + 0.16: add buffer_fromsa (make buffer from stralloc) add API for integer multiply with overflow detection diff --git a/GNUmakefile b/GNUmakefile index e3eeb82..a66c8ef 100644 --- a/GNUmakefile +++ b/GNUmakefile @@ -60,7 +60,7 @@ $(DNS_OBJS): dns.h stralloc.h taia.h tai.h uint64.h iopause.h $(CASE_OBJS): case.h $(ARRAY_OBJS): uint64.h array.h $(MULT_OBJS): uint64.h uint32.h uint16.h safemult.h -$(IO_OBJS): uint64.h array.h io.h io_internal.h taia.h tai.h haveepoll.h havekqueue.h +$(IO_OBJS): uint64.h array.h io.h io_internal.h taia.h tai.h haveepoll.h havekqueue.h havesigio.h iob_addbuf.o iob_addfile.o iob_new.o iob_reset.o iob_send.o: iob_internal.h iob.h @@ -111,7 +111,7 @@ t: t.o libowfat.a .PHONY: all clean tar install rename clean: rm -f *.o *.a *.da *.bbg *.bb core t haveip6.h haven2i.h havesl.h haveinline.h \ -iopause.h select.h havekqueue.h haveepoll.h libepoll Makefile +iopause.h select.h havekqueue.h haveepoll.h libepoll havesigio.h Makefile dep INCLUDES=buffer.h byte.h fmt.h ip4.h ip6.h mmap.h scan.h socket.h str.h stralloc.h \ uint16.h uint32.h uint64.h open.h textcode.h tai.h taia.h dns.h iopause.h case.h \ @@ -172,6 +172,11 @@ haveepoll.h: tryepoll.c libepoll: haveepoll.h if test "z`cat haveepoll.h`" = "z#define HAVE_EPOLL 2"; then echo -lepoll; fi > $@ +havesigio.h: trysigio.c + -rm -f $@ + if $(DIET) $(CC) $(CFLAGS) -c trysigio.c >/dev/null 2>&1; then echo "#define HAVE_SIGIO"; fi > $@ + -rm -f trysigio.o + iopause.h: iopause.h1 iopause.h2 trypoll.c -rm -f $@ if $(DIET) $(CC) $(CFLAGS) -o t trypoll.c >/dev/null 2>&1; then cp iopause.h2 iopause.h; else cp iopause.h1 iopause.h; fi @@ -195,7 +200,7 @@ socket_remote6.o: havesl.h fmt_xlong.o scan_xlong.o fmt_ip6_flat.o $(TEXTCODE_OBJS): haveinline.h -dep: haveip6.h haven2i.h havesl.h haveinline.h iopause.h select.h +dep: haveip6.h haven2i.h havesl.h haveinline.h iopause.h select.h haveepoll.h havekqueue.h gcc -I. -MM $(wildcard */*.c) t.c > dep libdep: diff --git a/io.h b/io.h index f4fcb3d..cf2816d 100644 --- a/io.h +++ b/io.h @@ -45,6 +45,10 @@ void io_wait(); void io_waituntil(tai6464 t); void io_check(); +/* signal that read/accept/whatever returned EAGAIN */ +/* needed for SIGIO */ +void io_eagain(int64 d); + /* return next descriptor from io_wait that can be read from */ int64 io_canread(); /* return next descriptor from io_wait that can be written to */ diff --git a/io/io_canread.c b/io/io_canread.c index 9ecdc3c..7c4c35d 100644 --- a/io/io_canread.c +++ b/io/io_canread.c @@ -6,12 +6,34 @@ int64 io_canread() { io_entry* e; - if (first_readable==-1) return -1; - e=array_get(&io_fds,sizeof(io_entry),first_readable); - if (e && e->canread) { - int64 r=first_readable; + if (first_readable==-1) +#ifdef HAVE_SIGIO + { + if (alt_firstread>=0 && (e=array_get(&io_fds,sizeof(io_entry),alt_firstread)) && e->canread) { + debug_printf(("io_canread: normal read queue is empty, swapping in alt read queue (starting with %ld)\n",alt_firstread)); + first_readable=alt_firstread; + alt_firstread=-1; + } else + return -1; + } +#else + return -1; +#endif + for (;;) { + int64 r; + e=array_get(&io_fds,sizeof(io_entry),first_readable); + if (!e) break; + r=first_readable; first_readable=e->next_read; - return r; + debug_printf(("io_canread: dequeue %lld from normal read queue (next is %ld)\n",r,first_readable)); + if (e->wantread && e->canread) { +#ifdef HAVE_SIGIO + e->next_read=alt_firstread; + alt_firstread=r; + debug_printf(("io_canread: enqueue %ld in alt read queue (next is %ld)\n",alt_firstread,e->next_read)); +#endif + return r; + } } return -1; } diff --git a/io/io_canwrite.c b/io/io_canwrite.c index 9a9c2a7..9865aac 100644 --- a/io/io_canwrite.c +++ b/io/io_canwrite.c @@ -6,12 +6,34 @@ int64 io_canwrite() { io_entry* e; - if (first_writeable==-1) return -1; - e=array_get(&io_fds,sizeof(io_entry),first_writeable); - if (e && e->canwrite) { - int64 r=first_writeable; + if (first_writeable==-1) +#ifdef HAVE_SIGIO + { + if (alt_firstwrite>=0 && (e=array_get(&io_fds,sizeof(io_entry),alt_firstwrite)) && e->canwrite) { + debug_printf(("io_canwrite: normal write queue is empty, swapping in alt write queue (starting with %ld)\n",alt_firstwrite)); + first_writeable=alt_firstwrite; + alt_firstwrite=-1; + } else + return -1; + } +#else + return -1; +#endif + for (;;) { + int64 r; + e=array_get(&io_fds,sizeof(io_entry),first_writeable); + if (!e) break; + r=first_writeable; first_writeable=e->next_write; - return r; + debug_printf(("io_canwrite: dequeue %lld from normal write queue (next is %ld)\n",r,first_writeable)); + if (e->wantwrite && e->canwrite) { +#ifdef HAVE_SIGIO + e->next_write=alt_firstwrite; + alt_firstwrite=r; + debug_printf(("io_canwrite: enqueue %ld in alt write queue (next is %ld)\n",alt_firstwrite,e->next_write)); +#endif + return r; + } } return -1; } diff --git a/io/io_eagain.c b/io/io_eagain.c new file mode 100644 index 0000000..76f8650 --- /dev/null +++ b/io/io_eagain.c @@ -0,0 +1,19 @@ +#include "io_internal.h" + +void io_eagain(int64 d) { + io_entry* e=array_get(&io_fds,sizeof(io_entry),d); + if (e) { + e->canread=0; +#ifdef HAVE_SIGIO + if (d==alt_firstread) { + debug_printf(("io_eagain: dequeueing %lld from alt read queue (next is %ld)\n",d,e->next_read)); + alt_firstread=e->next_read; + } + if (d==alt_firstwrite) { + debug_printf(("io_eagain: dequeueing %lld from alt write queue (next is %ld)\n",d,e->next_write)); + alt_firstwrite=e->next_write; + } +#endif + e->next_read=-1; + } +} diff --git a/io/io_fd.c b/io/io_fd.c index 5ed8d8e..f8f4511 100644 --- a/io/io_fd.c +++ b/io/io_fd.c @@ -1,7 +1,14 @@ +#define extern +#include "io_internal.h" +#undef extern +#ifdef HAVE_SIGIO +#define _GNU_SOURCE +#include +#include +#endif #include #include -#include "io_internal.h" #ifdef HAVE_KQUEUE #include #endif @@ -20,16 +27,38 @@ int io_fd(int64 d) { if (r&O_NDELAY) e->nonblock=1; e->next_read=e->next_write=-1; if (io_waitmode==UNDECIDED) { + first_readable=first_writeable=-1; #if defined(HAVE_EPOLL) io_master=epoll_create(1000); if (io_master!=-1) io_waitmode=EPOLL; #endif #if defined(HAVE_KQUEUE) - if (io_waitmode==POLL) { /* who knows, maybe someone supports both one day */ + if (io_waitmode==UNDECIDED) { /* who knows, maybe someone supports both one day */ io_master=kqueue(); if (io_master!=-1) io_waitmode=KQUEUE; } #endif +#if defined(HAVE_SIGIO) + alt_firstread=alt_firstwrite=-1; + if (io_waitmode==UNDECIDED) { + io_signum=SIGRTMIN+1; + if (sigemptyset(&io_ss)==0 && + sigaddset(&io_ss,io_signum)==0 && + sigaddset(&io_ss,SIGIO)==0 && + sigprocmask(SIG_BLOCK,&io_ss,0)==0) + io_waitmode=_SIGIO; + } +#endif + } +#if defined(HAVE_SIGIO) + if (io_waitmode==_SIGIO) { + fcntl(d,F_SETOWN,getpid()); + fcntl(d,F_SETSIG,io_signum); +#if defined(O_ONESIGFD) && defined(F_SETAUXFL) + fcntl(d, F_SETAUXFL, O_ONESIGFD); +#endif + fcntl(d,F_SETFL,fcntl(d,F_GETFL)|O_NONBLOCK|O_ASYNC); } +#endif return 1; } diff --git a/io/io_tryread.c b/io/io_tryread.c index d69aa4a..0c35bcc 100644 --- a/io/io_tryread.c +++ b/io/io_tryread.c @@ -42,6 +42,12 @@ int64 io_tryread(int64 d,char* buf,int64 len) { } if (r==-1 || r==0) { e->canread=0; +#ifdef HAVE_SIGIO + if (d==alt_firstread) { + debug_printf(("io_tryread: dequeueing %ld from alt read queue (next is %ld)\n",d,e->next_read)); + alt_firstread=e->next_read; + } +#endif e->next_read=-1; } return r; diff --git a/io/io_trywrite.c b/io/io_trywrite.c index 73586c6..4dbfa4e 100644 --- a/io/io_trywrite.c +++ b/io/io_trywrite.c @@ -42,6 +42,12 @@ int64 io_trywrite(int64 d,const char* buf,int64 len) { } if (r==-1 || r==0) { e->canwrite=0; +#ifdef HAVE_SIGIO + if (d==alt_firstwrite) { + debug_printf(("io_trywrite: dequeueing %ld from alt write queue (next is %ld)\n",d,e->next_write)); + alt_firstwrite=e->next_write; + } +#endif e->next_write=-1; } return r; diff --git a/io/io_waituntil2.c b/io/io_waituntil2.c index e5d3ddf..996a741 100644 --- a/io/io_waituntil2.c +++ b/io/io_waituntil2.c @@ -1,8 +1,12 @@ +#include "io_internal.h" +#ifdef HAVE_SIGIO +#define _GNU_SOURCE +#include +#endif #include #include #include #include -#include "io_internal.h" #ifdef HAVE_KQUEUE #include #endif @@ -61,6 +65,48 @@ int64 io_waituntil2(int64 milliseconds) { } return n; } +#endif +#ifdef HAVE_SIGIO + if (io_waitmode==_SIGIO) { + siginfo_t info; + struct timespec ts; + int r; + io_entry* e; + if (alt_firstread>=0 && (e=array_get(&io_fds,sizeof(io_entry),alt_firstread)) && e->canread) return 1; + if (alt_firstwrite>=0 && (e=array_get(&io_fds,sizeof(io_entry),alt_firstwrite)) && e->canwrite) return 1; + if (milliseconds==-1) + r=sigwaitinfo(&io_ss,&info); + else { + ts.tv_sec=milliseconds/1000; ts.tv_nsec=(milliseconds%1000)*1000000; + r=sigtimedwait(&io_ss,&info,&ts); + } + switch (r) { + case SIGIO: + /* signal queue overflow */ + signal(io_signum,SIG_DFL); + goto dopoll; + default: + if (r==io_signum) { + io_entry* e=array_get(&io_fds,sizeof(io_entry),info.si_fd); + if (e) { + if (info.si_band&POLLIN && !e->canread) { + debug_printf(("io_waituntil2: enqueueing %ld in normal read queue before %ld\n",info.si_fd,first_readable)); + e->canread=1; + e->next_read=first_readable; + first_readable=info.si_fd; + } + if (info.si_band&POLLOUT && !e->canwrite) { + debug_printf(("io_waituntil2: enqueueing %ld in normal write queue before %ld\n",info.si_fd,first_writeable)); + e->canwrite=1; + e->next_write=first_writeable; + first_writeable=info.si_fd; + } + } + } + } + return 1; + } +dopoll: #endif for (i=r=0; i #endif +#ifdef HAVE_SIGIO +#include +#endif void io_wantread(int64 d) { int newfd; io_entry* e=array_get(&io_fds,sizeof(io_entry),d); - if (!e) return; + if (!e || e->wantread) return; newfd=(!e->wantread && !e->wantwrite); io_wanted_fds+=newfd; #ifdef HAVE_EPOLL @@ -34,6 +37,23 @@ void io_wantread(int64 d) { ts.tv_sec=0; ts.tv_nsec=0; kevent(io_master,&kev,1,0,0,&ts); } +#endif +#ifdef HAVE_SIGIO + if (io_waitmode==_SIGIO) { + struct pollfd p; + p.fd=d; + p.events=POLLIN; + switch (poll(&p,1,0)) { + case 1: e->canread=1; break; + case 0: e->canread=0; break; + case -1: return; + } + if (e->canread) { + debug_printf(("io_wantread: enqueueing %lld in normal read queue (next is %ld)\n",d,first_readable)); + e->next_read=first_readable; + first_readable=d; + } + } #endif e->wantread=1; } diff --git a/io/io_wantwrite.c b/io/io_wantwrite.c index 2bcb0e3..cec001f 100644 --- a/io/io_wantwrite.c +++ b/io/io_wantwrite.c @@ -10,11 +10,14 @@ #ifdef HAVE_EPOLL #include #endif +#ifdef HAVE_SIGIO +#include +#endif void io_wantwrite(int64 d) { int newfd; io_entry* e=array_get(&io_fds,sizeof(io_entry),d); - if (!e) return; + if (!e || e->wantwrite) return; newfd=(!e->wantread && !e->wantwrite); io_wanted_fds+=newfd; #ifdef HAVE_EPOLL @@ -34,6 +37,23 @@ void io_wantwrite(int64 d) { ts.tv_sec=0; ts.tv_nsec=0; kevent(io_master,&kev,1,0,0,&ts); } +#endif +#ifdef HAVE_SIGIO + if (io_waitmode==_SIGIO) { + struct pollfd p; + p.fd=d; + p.events=POLLOUT; + switch (poll(&p,1,0)) { + case 1: e->canwrite=1; break; + case 0: e->canwrite=0; break; + case -1: return; + } + if (e->canwrite) { + debug_printf(("io_wantwrite: enqueueing %lld in normal write queue before %ld\n",d,first_readable)); + e->next_write=first_writeable; + first_writeable=d; + } + } #endif e->wantwrite=1; } diff --git a/io/iob_send.c b/io/iob_send.c index 2989862..0a7a476 100644 --- a/io/iob_send.c +++ b/io/iob_send.c @@ -79,7 +79,10 @@ int64 iob_send(int64 s,io_batch* b) { if (sent>0) total+=sent; else - if (!total) return -1; + if (!total) { + if (errno!=EAGAIN) return -3; + return -1; + } if (sent==b->bytesleft) { b->bytesleft=0; #ifdef TCP_CORK diff --git a/io_internal.h b/io_internal.h index b3f3a83..66b9947 100644 --- a/io_internal.h +++ b/io_internal.h @@ -2,6 +2,11 @@ #include "array.h" #include "haveepoll.h" #include "havekqueue.h" +#include "havesigio.h" +#ifdef HAVE_SIGIO +#define _GNU_SOURCE +#include +#endif typedef struct { unsigned int wantread:1; @@ -16,12 +21,12 @@ typedef struct { void* cookie; } io_entry; -array io_fds; -uint64 io_wanted_fds; -array io_pollfds; +extern array io_fds; +extern uint64 io_wanted_fds; +extern array io_pollfds; -unsigned long first_readable; -unsigned long first_writeable; +extern long first_readable; +extern long first_writeable; enum { UNDECIDED, @@ -32,10 +37,22 @@ enum { #ifdef HAVE_EPOLL ,EPOLL #endif +#ifdef HAVE_SIGIO + ,_SIGIO +#endif } io_waitmode; #if defined(HAVE_KQUEUE) || defined(HAVE_EPOLL) -int io_master; +extern int io_master; +#endif +#if defined(HAVE_SIGIO) +extern int io_signum; +extern sigset_t io_ss; + +extern long alt_firstread; +extern long alt_firstwrite; #endif int64 io_waituntil2(int64 milliseconds); + +#define debug_printf(x) diff --git a/test/httpd.c b/test/httpd.c index 8429bdd..011cc6a 100644 --- a/test/httpd.c +++ b/test/httpd.c @@ -123,6 +123,7 @@ const char* http_header(struct http_data* r,const char* h) { if (*c==' ' || *c=='\t') ++c; return c; } + return 0; } void httpresponse(struct http_data* h,int64 s) { @@ -225,17 +226,19 @@ main() { } buffer_putnlflush(buffer_2); } - if (errno!=EAGAIN) + if (errno==EAGAIN) + io_eagain(s); + else carp("socket_accept6"); } else { char buf[8192]; struct http_data* h=io_getcookie(i); int l=io_tryread(i,buf,sizeof buf); - if (l==-1) { + if (l==-3) { if (h) { array_reset(&h->r); iob_reset(&h->iob); - free(h->hdrbuf); + free(h->hdrbuf); h->hdrbuf=0; } buffer_puts(buffer_2,"io_tryread("); buffer_putulong(buffer_2,i); @@ -247,13 +250,13 @@ main() { if (h) { array_reset(&h->r); iob_reset(&h->iob); - free(h->hdrbuf); + free(h->hdrbuf); h->hdrbuf=0; } buffer_puts(buffer_2,"eof on fd #"); buffer_putulong(buffer_2,i); buffer_putnlflush(buffer_2); io_close(i); - } else { + } else if (l>0) { array_catb(&h->r,buf,l); if (array_failed(&h->r)) { httperror(h,"500 Server Error","request too long."); @@ -270,10 +273,13 @@ emerge: } while ((i=io_canwrite())!=-1) { struct http_data* h=io_getcookie(i); - if (iob_send(i,&h->iob)<=0) { + int64 r=iob_send(i,&h->iob); +/* printf("iob_send returned %lld\n",r); */ + if (r==-1) io_eagain(i); + if (r<=0) { array_trunc(&h->r); iob_reset(&h->iob); - free(h->hdrbuf); + free(h->hdrbuf); h->hdrbuf=0; if (h->keepalive) { io_dontwantwrite(i); io_wantread(i);