diff --git a/io/iom_wait.c b/io/iom_wait.c index eaa6fd8..086e6cd 100644 --- a/io/iom_wait.c +++ b/io/iom_wait.c @@ -9,39 +9,62 @@ #endif #include +// return the next event, waiting of none are queued +// wait at most timeout milliseconds +// on success, return 1 and return the fd in *s and the events on it in *revents +// if we waited but ran into a timeout, return 0 +// if we run into a system error, return -1 +// if another thread aborted this iomux, return -2 int iom_wait(iomux_t* c,int64* s,unsigned int* revents,unsigned long timeout) { for (;;) { /* If we have an event in the queue, use that one */ int r; if (c->working==-2) return -2; /* iomux was aborted */ - for (;;) { + + for (;;) { // CAS-loop get the first element from the queue unsigned int f=c->l; if (f == c->h) break; /* no elements in queue */ int n=(f+1)%SLOTS; + + *s=c->q[f].fd; + *revents=c->q[f].events; + if (__sync_bool_compare_and_swap(&c->l,f,n)) { - /* we got one, and its index is in f */ - *s=c->q[f].fd; - *revents=c->q[f].events; + /* we got one */ + return 1; } /* collided with another thread, try again */ } + /* The queue was empty. If someone else is already calling * epoll_wait/kevent, then use the semaphore */ if (__sync_bool_compare_and_swap(&c->working,0,1)) { + /* race avoidance: another thread could have come in between and + * refilled the job queue */ + if (c->h != c->l) { + /* set working back to 0 unless someone set it to -2 in the mean time (iom_abort) */ + if (__sync_val_compare_and_swap(&c->working,1,0)==-2) return -2; + continue; + } + /* we have the job to fill the struct. */ - int freeslots = (c->h - c->l); - if (!freeslots) freeslots=SLOTS; #ifdef HAVE_EPOLL struct epoll_event ee[SLOTS]; int i; - r=epoll_wait(c->ctx, ee, freeslots, timeout); - if (r<=0) { - /* we ran into a timeout, so let someone else take over */ + r=epoll_wait(c->ctx, ee, SLOTS, timeout); + if (r <= 0) { + /* epoll_wait returned a timeout or an error! */ + /* relinquish the lock and return 0 / -1 */ if (__sync_val_compare_and_swap(&c->working,1,0)==-2) return -2; #ifdef __dietlibc__ - cnd_broadcast(&c->sem); + // for timeout we want to hand off to one other thread, no need + // to wake them all up. Error might be transient (EINTR) and the + // next guy might succeed, so only wake one up. If the error was + // not transient, then they will also get an error and wake the + // next up + cnd_signal(&c->sem); #else sem_post(&c->sem); #endif @@ -56,7 +79,10 @@ int iom_wait(iomux_t* c,int64* s,unsigned int* revents,unsigned long timeout) { /* return last event instead of enqueueing it */ *s=ee[i].data.fd; *revents=e; + /* loop terminates here, but no return statement because we + * still need to signal the semaphore below */ } else { + /* The CAS loop on c->working above ensures we are the only one writing to c->h */ c->q[c->h].fd=ee[i].data.fd; c->q[c->h].events=e; c->h = (c->h + 1) % SLOTS; @@ -65,12 +91,14 @@ int iom_wait(iomux_t* c,int64* s,unsigned int* revents,unsigned long timeout) { #elif defined(HAVE_KQUEUE) struct kevent kev[SLOTS]; struct timespec ts = { .tv_sec=timeout/1000, .tv_nsec=(timeout%1000)*1000000 }; - int r=kevent(c->ctx, 0, 0, kev, freeslots, &ts); + int r=kevent(c->ctx, 0, 0, kev, SLOTS, &ts); int i; if (r<=0) { - /* we ran into a timeout, so let someone else take over */ + /* kevent returned a timeout or an error! */ + /* relinquish the lock and return 0 / -1 */ if (__sync_val_compare_and_swap(&c->working,1,0)==-2) return -2; #ifdef __dietlibc__ + // no dietlibc for kqueue based systems yet cnd_broadcast(&c->sem); #else sem_post(&c->sem); @@ -85,7 +113,10 @@ int iom_wait(iomux_t* c,int64* s,unsigned int* revents,unsigned long timeout) { /* return last event instead of enqueueing it */ *s=kev[i].ident; *revents=e; + /* loop terminates here, but no return statement because we + * still need to signal the semaphore below */ } else { + /* The CAS loop on c->working above ensures we are the only one writing to c->h */ c->q[c->h].fd=kev[i].ident; c->q[c->h].events=e; c->h = (c->h + 1) % SLOTS; @@ -100,7 +131,10 @@ int iom_wait(iomux_t* c,int64* s,unsigned int* revents,unsigned long timeout) { doing it anymore */ if (__sync_val_compare_and_swap(&c->working,1,0)==-2) return -2; #ifdef __dietlibc__ - cnd_signal(&c->sem); + if (c->h == (c->l + 1) % SLOTS) + cnd_signal(&c->sem); + else + cnd_broadcast(&c->sem); #else sem_post(&c->sem); #endif