diff --git a/array/array_allocate.c b/array/array_allocate.c index 0c25e67..db86ad5 100644 --- a/array/array_allocate.c +++ b/array/array_allocate.c @@ -75,6 +75,6 @@ void* array_allocate(array* x,uint64 membersize,int64 pos) { x->allocated=wanted; byte_zero(x->p+x->initialized,x->allocated-x->initialized); } - x->initialized=pos*membersize; + x->initialized=(pos+1)*membersize; return x->p+pos*membersize; } diff --git a/io.h b/io.h index 0d96388..5edf66b 100644 --- a/io.h +++ b/io.h @@ -50,6 +50,9 @@ int64 io_canread(); /* return next descriptor from io_wait that can be written to */ int64 io_canwrite(); +/* return next descriptor with expired timeout */ +int64 io_timeouted(); + /* put d on internal data structure, return 1 on success, 0 on error */ int io_fd(int64 d); diff --git a/io/io_check.c b/io/io_check.c new file mode 100644 index 0000000..2516b12 --- /dev/null +++ b/io/io_check.c @@ -0,0 +1,9 @@ +#include +#include +#include +#include +#include "io_internal.h" + +void io_check() { + io_waituntil2(0); +} diff --git a/io/io_close.c b/io/io_close.c index b00a211..190a55c 100644 --- a/io/io_close.c +++ b/io/io_close.c @@ -4,5 +4,9 @@ void io_close(int64 d) { close(d); io_entry* e=array_get(&io_fds,sizeof(io_entry),d); - if (e) e->inuse=0; + if (e) { + e->inuse=0; + io_dontwantread(d); + io_dontwantwrite(d); + } } diff --git a/io/io_fd.c b/io/io_fd.c index cfc4e11..5969a35 100644 --- a/io/io_fd.c +++ b/io/io_fd.c @@ -12,5 +12,6 @@ int io_fd(int64 d) { if (!(e=array_allocate(&io_fds,sizeof(io_entry),d))) return 0; e->inuse=1; if (r&O_NDELAY) e->nonblock=1; + e->next_read=e->next_write=-1; return 1; } diff --git a/io/io_tryread.c b/io/io_tryread.c index 4cd922b..d69aa4a 100644 --- a/io/io_tryread.c +++ b/io/io_tryread.c @@ -16,7 +16,10 @@ int64 io_tryread(int64 d,char* buf,int64 len) { p.events=POLLIN; switch (poll(&p,1,0)) { case -1: return -3; - case 0: errno=EAGAIN; return -1; + case 0: errno=EAGAIN; + e->canread=0; + e->next_read=-1; + return -1; } new.it_interval.tv_usec=0; new.it_interval.tv_sec=0; @@ -37,5 +40,9 @@ int64 io_tryread(int64 d,char* buf,int64 len) { if (errno!=EAGAIN) r=-3; } + if (r==-1 || r==0) { + e->canread=0; + e->next_read=-1; + } return r; } diff --git a/io/io_trywrite.c b/io/io_trywrite.c index 21bd2d9..73586c6 100644 --- a/io/io_trywrite.c +++ b/io/io_trywrite.c @@ -16,7 +16,10 @@ int64 io_trywrite(int64 d,const char* buf,int64 len) { p.events=POLLOUT; switch (poll(&p,1,0)) { case -1: return -3; - case 0: errno=EAGAIN; return -1; + case 0: errno=EAGAIN; + e->canwrite=0; + e->next_write=-1; + return -1; } new.it_interval.tv_usec=0; new.it_interval.tv_sec=0; @@ -32,8 +35,14 @@ int64 io_trywrite(int64 d,const char* buf,int64 len) { new.it_value.tv_sec=0; setitimer(ITIMER_REAL,&new,&old); } - if (r==-1) + if (r==-1) { + if (errno==EINTR) errno=EAGAIN; if (errno!=EAGAIN) r=-3; + } + if (r==-1 || r==0) { + e->canwrite=0; + e->next_write=-1; + } return r; } diff --git a/io/io_waituntil2.c b/io/io_waituntil2.c index 006d286..5b78a71 100644 --- a/io/io_waituntil2.c +++ b/io/io_waituntil2.c @@ -29,10 +29,18 @@ again: if (errno==EINTR) goto again; return -1; } - for (i=0; i=0; --i) { io_entry* e=array_get(&io_fds,sizeof(io_entry),p->fd); - if (p->revents&POLLIN) e->canread=1; - if (p->revents&POLLOUT) e->canwrite=1; + if (p->revents&POLLIN) { + e->canread=1; + e->next_read=first_readable; + first_readable=p->fd; + } + if (p->revents&POLLOUT) { + e->canwrite=1; + e->next_write=first_writeable; + first_writeable=p->fd; + } p++; } return i; diff --git a/io_internal.h b/io_internal.h index eb206aa..ba75b11 100644 --- a/io_internal.h +++ b/io_internal.h @@ -9,10 +9,15 @@ typedef struct { unsigned int nonblock:1; unsigned int inuse:1; tai6464 timeout; + long next_read; + long next_write; } io_entry; array io_fds; uint64 io_wanted_fds; array io_pollfds; +unsigned long first_readable; +unsigned long first_writeable; + int64 io_waituntil2(int64 milliseconds); diff --git a/test/io5.c b/test/io5.c new file mode 100644 index 0000000..b632f3b --- /dev/null +++ b/test/io5.c @@ -0,0 +1,95 @@ +#include "socket.h" +#include "io.h" +#include "buffer.h" +#include "ip6.h" +#include + +main() { + int s=socket_tcp6(); + uint32 scope_id; + char ip[16]; + uint16 port; + if (socket_bind6_reuse(s,V6any,1234,0)==-1) return 111; + if (socket_listen(s,16)==-1) return 111; + io_nonblock(s); + if (!io_fd(s)) return 111; + io_wantread(s); + buffer_puts(buffer_2,"listening on port 1234 (fd #"); + buffer_putulong(buffer_2,s); + buffer_putsflush(buffer_2,")\n"); + for (;;) { + int64 i; + io_wait(); + buffer_putsflush(buffer_2,"io_wait() returned!\n"); + while ((i=io_canread())!=-1) { + if (i==s) { + int n; + while ((n=socket_accept6(s,ip,&port,&scope_id))!=-1) { + char buf[IP6_FMT]; + buffer_puts(buffer_2,"accepted new connection from "); + buffer_put(buffer_2,buf,fmt_ip6(buf,ip)); + buffer_puts(buffer_2,":"); + buffer_putulong(buffer_2,port); + buffer_puts(buffer_2," (fd "); + buffer_putulong(buffer_2,n); + buffer_puts(buffer_2,")"); + if (io_fd(n)) { + io_wantread(n); + } else { + buffer_puts(buffer_2,", but io_fd failed."); + io_close(n); + } + buffer_putnlflush(buffer_2); + } + if (errno!=EAGAIN) { + buffer_puts(buffer_2,"socket_accept6: "); + buffer_puterror(buffer_2); + buffer_putnlflush(buffer_2); + } + } else { + char buf[1024]; + int l=io_tryread(i,buf,sizeof buf); + if (l==-1) { + buffer_puts(buffer_2,"io_tryread("); + buffer_putulong(buffer_2,i); + buffer_puts(buffer_2,"): "); + buffer_puterror(buffer_2); + buffer_putnlflush(buffer_2); + io_close(i); + } else if (l==0) { + buffer_puts(buffer_2,"eof on fd #"); + buffer_putulong(buffer_2,i); + buffer_putnlflush(buffer_2); + io_close(i); + } else { + int r; + switch (r=io_trywrite(i,buf,l)) { + case -1: + buffer_puts(buffer_2,"io_tryread("); + buffer_putulong(buffer_2,i); + buffer_puts(buffer_2,"): "); + buffer_puterror(buffer_2); + buffer_putnlflush(buffer_2); + io_close(i); + break; + case 0: + buffer_puts(buffer_2,"write eof on fd #"); + buffer_putulong(buffer_2,i); + buffer_putnlflush(buffer_2); + io_close(i); + default: + if (r!=l) { + buffer_puts(buffer_2,"short write on fd #"); + buffer_putulong(buffer_2,i); + buffer_puts(buffer_2,": wrote "); + buffer_putulong(buffer_2,r); + buffer_puts(buffer_2,", wanted to write "); + buffer_putulong(buffer_2,l); + buffer_putsflush(buffer_2,").\n"); + } + } + } + } + } + } +}