add sigio support

extend code to cope with edge triggered event notification
add io_eagain() so that you can notify io_wait if accept() returned EAGAIN
master
leitner 22 years ago
parent 5a4d2cc732
commit 2a2e1ddc75

@ -1,3 +1,8 @@
0.17:
add Linux SIGIO support to IO
expand IO api to be able to cope with edge triggered event
notification: introduce io_eagain
0.16:
add buffer_fromsa (make buffer from stralloc)
add API for integer multiply with overflow detection

@ -60,7 +60,7 @@ $(DNS_OBJS): dns.h stralloc.h taia.h tai.h uint64.h iopause.h
$(CASE_OBJS): case.h
$(ARRAY_OBJS): uint64.h array.h
$(MULT_OBJS): uint64.h uint32.h uint16.h safemult.h
$(IO_OBJS): uint64.h array.h io.h io_internal.h taia.h tai.h haveepoll.h havekqueue.h
$(IO_OBJS): uint64.h array.h io.h io_internal.h taia.h tai.h haveepoll.h havekqueue.h havesigio.h
iob_addbuf.o iob_addfile.o iob_new.o iob_reset.o iob_send.o: iob_internal.h iob.h
@ -111,7 +111,7 @@ t: t.o libowfat.a
.PHONY: all clean tar install rename
clean:
rm -f *.o *.a *.da *.bbg *.bb core t haveip6.h haven2i.h havesl.h haveinline.h \
iopause.h select.h havekqueue.h haveepoll.h libepoll Makefile
iopause.h select.h havekqueue.h haveepoll.h libepoll havesigio.h Makefile dep
INCLUDES=buffer.h byte.h fmt.h ip4.h ip6.h mmap.h scan.h socket.h str.h stralloc.h \
uint16.h uint32.h uint64.h open.h textcode.h tai.h taia.h dns.h iopause.h case.h \
@ -172,6 +172,11 @@ haveepoll.h: tryepoll.c
libepoll: haveepoll.h
if test "z`cat haveepoll.h`" = "z#define HAVE_EPOLL 2"; then echo -lepoll; fi > $@
havesigio.h: trysigio.c
-rm -f $@
if $(DIET) $(CC) $(CFLAGS) -c trysigio.c >/dev/null 2>&1; then echo "#define HAVE_SIGIO"; fi > $@
-rm -f trysigio.o
iopause.h: iopause.h1 iopause.h2 trypoll.c
-rm -f $@
if $(DIET) $(CC) $(CFLAGS) -o t trypoll.c >/dev/null 2>&1; then cp iopause.h2 iopause.h; else cp iopause.h1 iopause.h; fi
@ -195,7 +200,7 @@ socket_remote6.o: havesl.h
fmt_xlong.o scan_xlong.o fmt_ip6_flat.o $(TEXTCODE_OBJS): haveinline.h
dep: haveip6.h haven2i.h havesl.h haveinline.h iopause.h select.h
dep: haveip6.h haven2i.h havesl.h haveinline.h iopause.h select.h haveepoll.h havekqueue.h
gcc -I. -MM $(wildcard */*.c) t.c > dep
libdep:

@ -45,6 +45,10 @@ void io_wait();
void io_waituntil(tai6464 t);
void io_check();
/* signal that read/accept/whatever returned EAGAIN */
/* needed for SIGIO */
void io_eagain(int64 d);
/* return next descriptor from io_wait that can be read from */
int64 io_canread();
/* return next descriptor from io_wait that can be written to */

@ -6,12 +6,34 @@
int64 io_canread() {
io_entry* e;
if (first_readable==-1) return -1;
e=array_get(&io_fds,sizeof(io_entry),first_readable);
if (e && e->canread) {
int64 r=first_readable;
if (first_readable==-1)
#ifdef HAVE_SIGIO
{
if (alt_firstread>=0 && (e=array_get(&io_fds,sizeof(io_entry),alt_firstread)) && e->canread) {
debug_printf(("io_canread: normal read queue is empty, swapping in alt read queue (starting with %ld)\n",alt_firstread));
first_readable=alt_firstread;
alt_firstread=-1;
} else
return -1;
}
#else
return -1;
#endif
for (;;) {
int64 r;
e=array_get(&io_fds,sizeof(io_entry),first_readable);
if (!e) break;
r=first_readable;
first_readable=e->next_read;
return r;
debug_printf(("io_canread: dequeue %lld from normal read queue (next is %ld)\n",r,first_readable));
if (e->wantread && e->canread) {
#ifdef HAVE_SIGIO
e->next_read=alt_firstread;
alt_firstread=r;
debug_printf(("io_canread: enqueue %ld in alt read queue (next is %ld)\n",alt_firstread,e->next_read));
#endif
return r;
}
}
return -1;
}

@ -6,12 +6,34 @@
int64 io_canwrite() {
io_entry* e;
if (first_writeable==-1) return -1;
e=array_get(&io_fds,sizeof(io_entry),first_writeable);
if (e && e->canwrite) {
int64 r=first_writeable;
if (first_writeable==-1)
#ifdef HAVE_SIGIO
{
if (alt_firstwrite>=0 && (e=array_get(&io_fds,sizeof(io_entry),alt_firstwrite)) && e->canwrite) {
debug_printf(("io_canwrite: normal write queue is empty, swapping in alt write queue (starting with %ld)\n",alt_firstwrite));
first_writeable=alt_firstwrite;
alt_firstwrite=-1;
} else
return -1;
}
#else
return -1;
#endif
for (;;) {
int64 r;
e=array_get(&io_fds,sizeof(io_entry),first_writeable);
if (!e) break;
r=first_writeable;
first_writeable=e->next_write;
return r;
debug_printf(("io_canwrite: dequeue %lld from normal write queue (next is %ld)\n",r,first_writeable));
if (e->wantwrite && e->canwrite) {
#ifdef HAVE_SIGIO
e->next_write=alt_firstwrite;
alt_firstwrite=r;
debug_printf(("io_canwrite: enqueue %ld in alt write queue (next is %ld)\n",alt_firstwrite,e->next_write));
#endif
return r;
}
}
return -1;
}

@ -0,0 +1,19 @@
#include "io_internal.h"
void io_eagain(int64 d) {
io_entry* e=array_get(&io_fds,sizeof(io_entry),d);
if (e) {
e->canread=0;
#ifdef HAVE_SIGIO
if (d==alt_firstread) {
debug_printf(("io_eagain: dequeueing %lld from alt read queue (next is %ld)\n",d,e->next_read));
alt_firstread=e->next_read;
}
if (d==alt_firstwrite) {
debug_printf(("io_eagain: dequeueing %lld from alt write queue (next is %ld)\n",d,e->next_write));
alt_firstwrite=e->next_write;
}
#endif
e->next_read=-1;
}
}

@ -1,7 +1,14 @@
#define extern
#include "io_internal.h"
#undef extern
#ifdef HAVE_SIGIO
#define _GNU_SOURCE
#include <sys/poll.h>
#include <signal.h>
#endif
#include <sys/types.h>
#include <fcntl.h>
#include "io_internal.h"
#ifdef HAVE_KQUEUE
#include <sys/event.h>
#endif
@ -20,16 +27,38 @@ int io_fd(int64 d) {
if (r&O_NDELAY) e->nonblock=1;
e->next_read=e->next_write=-1;
if (io_waitmode==UNDECIDED) {
first_readable=first_writeable=-1;
#if defined(HAVE_EPOLL)
io_master=epoll_create(1000);
if (io_master!=-1) io_waitmode=EPOLL;
#endif
#if defined(HAVE_KQUEUE)
if (io_waitmode==POLL) { /* who knows, maybe someone supports both one day */
if (io_waitmode==UNDECIDED) { /* who knows, maybe someone supports both one day */
io_master=kqueue();
if (io_master!=-1) io_waitmode=KQUEUE;
}
#endif
#if defined(HAVE_SIGIO)
alt_firstread=alt_firstwrite=-1;
if (io_waitmode==UNDECIDED) {
io_signum=SIGRTMIN+1;
if (sigemptyset(&io_ss)==0 &&
sigaddset(&io_ss,io_signum)==0 &&
sigaddset(&io_ss,SIGIO)==0 &&
sigprocmask(SIG_BLOCK,&io_ss,0)==0)
io_waitmode=_SIGIO;
}
#endif
}
#if defined(HAVE_SIGIO)
if (io_waitmode==_SIGIO) {
fcntl(d,F_SETOWN,getpid());
fcntl(d,F_SETSIG,io_signum);
#if defined(O_ONESIGFD) && defined(F_SETAUXFL)
fcntl(d, F_SETAUXFL, O_ONESIGFD);
#endif
fcntl(d,F_SETFL,fcntl(d,F_GETFL)|O_NONBLOCK|O_ASYNC);
}
#endif
return 1;
}

@ -42,6 +42,12 @@ int64 io_tryread(int64 d,char* buf,int64 len) {
}
if (r==-1 || r==0) {
e->canread=0;
#ifdef HAVE_SIGIO
if (d==alt_firstread) {
debug_printf(("io_tryread: dequeueing %ld from alt read queue (next is %ld)\n",d,e->next_read));
alt_firstread=e->next_read;
}
#endif
e->next_read=-1;
}
return r;

@ -42,6 +42,12 @@ int64 io_trywrite(int64 d,const char* buf,int64 len) {
}
if (r==-1 || r==0) {
e->canwrite=0;
#ifdef HAVE_SIGIO
if (d==alt_firstwrite) {
debug_printf(("io_trywrite: dequeueing %ld from alt write queue (next is %ld)\n",d,e->next_write));
alt_firstwrite=e->next_write;
}
#endif
e->next_write=-1;
}
return r;

@ -1,8 +1,12 @@
#include "io_internal.h"
#ifdef HAVE_SIGIO
#define _GNU_SOURCE
#include <signal.h>
#endif
#include <unistd.h>
#include <sys/time.h>
#include <poll.h>
#include <errno.h>
#include "io_internal.h"
#ifdef HAVE_KQUEUE
#include <sys/event.h>
#endif
@ -61,6 +65,48 @@ int64 io_waituntil2(int64 milliseconds) {
}
return n;
}
#endif
#ifdef HAVE_SIGIO
if (io_waitmode==_SIGIO) {
siginfo_t info;
struct timespec ts;
int r;
io_entry* e;
if (alt_firstread>=0 && (e=array_get(&io_fds,sizeof(io_entry),alt_firstread)) && e->canread) return 1;
if (alt_firstwrite>=0 && (e=array_get(&io_fds,sizeof(io_entry),alt_firstwrite)) && e->canwrite) return 1;
if (milliseconds==-1)
r=sigwaitinfo(&io_ss,&info);
else {
ts.tv_sec=milliseconds/1000; ts.tv_nsec=(milliseconds%1000)*1000000;
r=sigtimedwait(&io_ss,&info,&ts);
}
switch (r) {
case SIGIO:
/* signal queue overflow */
signal(io_signum,SIG_DFL);
goto dopoll;
default:
if (r==io_signum) {
io_entry* e=array_get(&io_fds,sizeof(io_entry),info.si_fd);
if (e) {
if (info.si_band&POLLIN && !e->canread) {
debug_printf(("io_waituntil2: enqueueing %ld in normal read queue before %ld\n",info.si_fd,first_readable));
e->canread=1;
e->next_read=first_readable;
first_readable=info.si_fd;
}
if (info.si_band&POLLOUT && !e->canwrite) {
debug_printf(("io_waituntil2: enqueueing %ld in normal write queue before %ld\n",info.si_fd,first_writeable));
e->canwrite=1;
e->next_write=first_writeable;
first_writeable=info.si_fd;
}
}
}
}
return 1;
}
dopoll:
#endif
for (i=r=0; i<array_length(&io_fds,sizeof(io_entry)); ++i) {
io_entry* e=array_get(&io_fds,sizeof(io_entry),i);

@ -10,11 +10,14 @@
#ifdef HAVE_EPOLL
#include <sys/epoll.h>
#endif
#ifdef HAVE_SIGIO
#include <sys/poll.h>
#endif
void io_wantread(int64 d) {
int newfd;
io_entry* e=array_get(&io_fds,sizeof(io_entry),d);
if (!e) return;
if (!e || e->wantread) return;
newfd=(!e->wantread && !e->wantwrite);
io_wanted_fds+=newfd;
#ifdef HAVE_EPOLL
@ -34,6 +37,23 @@ void io_wantread(int64 d) {
ts.tv_sec=0; ts.tv_nsec=0;
kevent(io_master,&kev,1,0,0,&ts);
}
#endif
#ifdef HAVE_SIGIO
if (io_waitmode==_SIGIO) {
struct pollfd p;
p.fd=d;
p.events=POLLIN;
switch (poll(&p,1,0)) {
case 1: e->canread=1; break;
case 0: e->canread=0; break;
case -1: return;
}
if (e->canread) {
debug_printf(("io_wantread: enqueueing %lld in normal read queue (next is %ld)\n",d,first_readable));
e->next_read=first_readable;
first_readable=d;
}
}
#endif
e->wantread=1;
}

@ -10,11 +10,14 @@
#ifdef HAVE_EPOLL
#include <sys/epoll.h>
#endif
#ifdef HAVE_SIGIO
#include <sys/poll.h>
#endif
void io_wantwrite(int64 d) {
int newfd;
io_entry* e=array_get(&io_fds,sizeof(io_entry),d);
if (!e) return;
if (!e || e->wantwrite) return;
newfd=(!e->wantread && !e->wantwrite);
io_wanted_fds+=newfd;
#ifdef HAVE_EPOLL
@ -34,6 +37,23 @@ void io_wantwrite(int64 d) {
ts.tv_sec=0; ts.tv_nsec=0;
kevent(io_master,&kev,1,0,0,&ts);
}
#endif
#ifdef HAVE_SIGIO
if (io_waitmode==_SIGIO) {
struct pollfd p;
p.fd=d;
p.events=POLLOUT;
switch (poll(&p,1,0)) {
case 1: e->canwrite=1; break;
case 0: e->canwrite=0; break;
case -1: return;
}
if (e->canwrite) {
debug_printf(("io_wantwrite: enqueueing %lld in normal write queue before %ld\n",d,first_readable));
e->next_write=first_writeable;
first_writeable=d;
}
}
#endif
e->wantwrite=1;
}

@ -79,7 +79,10 @@ int64 iob_send(int64 s,io_batch* b) {
if (sent>0)
total+=sent;
else
if (!total) return -1;
if (!total) {
if (errno!=EAGAIN) return -3;
return -1;
}
if (sent==b->bytesleft) {
b->bytesleft=0;
#ifdef TCP_CORK

@ -2,6 +2,11 @@
#include "array.h"
#include "haveepoll.h"
#include "havekqueue.h"
#include "havesigio.h"
#ifdef HAVE_SIGIO
#define _GNU_SOURCE
#include <signal.h>
#endif
typedef struct {
unsigned int wantread:1;
@ -16,12 +21,12 @@ typedef struct {
void* cookie;
} io_entry;
array io_fds;
uint64 io_wanted_fds;
array io_pollfds;
extern array io_fds;
extern uint64 io_wanted_fds;
extern array io_pollfds;
unsigned long first_readable;
unsigned long first_writeable;
extern long first_readable;
extern long first_writeable;
enum {
UNDECIDED,
@ -32,10 +37,22 @@ enum {
#ifdef HAVE_EPOLL
,EPOLL
#endif
#ifdef HAVE_SIGIO
,_SIGIO
#endif
} io_waitmode;
#if defined(HAVE_KQUEUE) || defined(HAVE_EPOLL)
int io_master;
extern int io_master;
#endif
#if defined(HAVE_SIGIO)
extern int io_signum;
extern sigset_t io_ss;
extern long alt_firstread;
extern long alt_firstwrite;
#endif
int64 io_waituntil2(int64 milliseconds);
#define debug_printf(x)

@ -123,6 +123,7 @@ const char* http_header(struct http_data* r,const char* h) {
if (*c==' ' || *c=='\t') ++c;
return c;
}
return 0;
}
void httpresponse(struct http_data* h,int64 s) {
@ -225,17 +226,19 @@ main() {
}
buffer_putnlflush(buffer_2);
}
if (errno!=EAGAIN)
if (errno==EAGAIN)
io_eagain(s);
else
carp("socket_accept6");
} else {
char buf[8192];
struct http_data* h=io_getcookie(i);
int l=io_tryread(i,buf,sizeof buf);
if (l==-1) {
if (l==-3) {
if (h) {
array_reset(&h->r);
iob_reset(&h->iob);
free(h->hdrbuf);
free(h->hdrbuf); h->hdrbuf=0;
}
buffer_puts(buffer_2,"io_tryread(");
buffer_putulong(buffer_2,i);
@ -247,13 +250,13 @@ main() {
if (h) {
array_reset(&h->r);
iob_reset(&h->iob);
free(h->hdrbuf);
free(h->hdrbuf); h->hdrbuf=0;
}
buffer_puts(buffer_2,"eof on fd #");
buffer_putulong(buffer_2,i);
buffer_putnlflush(buffer_2);
io_close(i);
} else {
} else if (l>0) {
array_catb(&h->r,buf,l);
if (array_failed(&h->r)) {
httperror(h,"500 Server Error","request too long.");
@ -270,10 +273,13 @@ emerge:
}
while ((i=io_canwrite())!=-1) {
struct http_data* h=io_getcookie(i);
if (iob_send(i,&h->iob)<=0) {
int64 r=iob_send(i,&h->iob);
/* printf("iob_send returned %lld\n",r); */
if (r==-1) io_eagain(i);
if (r<=0) {
array_trunc(&h->r);
iob_reset(&h->iob);
free(h->hdrbuf);
free(h->hdrbuf); h->hdrbuf=0;
if (h->keepalive) {
io_dontwantwrite(i);
io_wantread(i);

Loading…
Cancel
Save